Source code for cerebras.modelzoo.tools.checkpoint_converters.llava

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import math
import os
import re
from collections import OrderedDict
from typing import List, Tuple

import torch

from cerebras.modelzoo.tools.checkpoint_converters.base_converter import (
    BaseCheckpointConverter_HF_CS,
    BaseCheckpointConverter_UnpackedHF_PackedCS,
    BaseConfigConverter,
    BaseConfigConverter_HF_CS,
    BaseConfigConverter_UnpackedHF_PackedCS,
    ConversionRule,
    EquivalentSubkey,
    FormatIndices,
    FormatVersions,
)
from cerebras.modelzoo.tools.checkpoint_converters.clip_vit import (
    ConfigConverter_CLIPViT_HF_CS21,
    Converter_CLIPViT_Core_HF_CS21,
)
from cerebras.modelzoo.tools.checkpoint_converters.helper import (
    Build_HF_CS_Converter_WithOptionalModel,
)
from cerebras.modelzoo.tools.checkpoint_converters.llama import (
    ConfigConverter_LLaMa_HF_CS21,
    Converter_LlamaModel_HF_CS,
)


# HF `CLIPVisionModel` <-> CS `modeling_llava.LLaVA.image_model`
[docs]class Converter_LLaVA_CLIPViT_WithoutModel_HF_CS22( BaseCheckpointConverter_HF_CS ): def __init__(self): super().__init__() self.rules = [ ConversionRule( [ EquivalentSubkey("vision_model.", "image_model."), Converter_CLIPViT_Core_HF_CS21(), ], ), # To handle cases where the ckpt corresponds to CLIPModel instead of CLIPVisionModel ConversionRule(["text_model.*"], action=None), ConversionRule(["logit_scale.*"], exists="left", action=None), # visual_projection and text_projection in HF ConversionRule( [r"visual_projection\.(?:weight|bias)"], action=None, ), ConversionRule( [ r"text_projection\.(?:weight|bias)", ], action=None, ), ConversionRule( ["projector_image_model.*"], exists="right", action=None ), ] def pre_checkpoint_convert( self, input_checkpoint, output_checkpoint, configs: Tuple[dict, dict], converter_indices: FormatIndices, ): # Normally this does output_checkpoint["model"] = {} and then we # reference output_checkpoint["model"] later in extract_model_dict. # We don't want to reset the output_checkpoint["model"] here though # because we will store the keys under the same "model" key # created by this function during the component conversion if converter_indices.direction == 0: pass @staticmethod def formats() -> Tuple[FormatVersions, FormatVersions]: return (FormatVersions("hf"), FormatVersions("cs-2.2", "cs-2.3")) @staticmethod def get_config_converter_class() -> BaseConfigConverter: return ConfigConverter_LLaVA_HF_CS22
[docs]class Converter_LLaVA_LLaMA_WithoutModel_HF_CS22(BaseCheckpointConverter_HF_CS): def __init__(self): super().__init__() self.rules = [ ConversionRule([r"image_model.*"], exists="right", action=None), # match LM head here ConversionRule( [ EquivalentSubkey("", "text_model."), r"lm_head\.(?:weight|bias)", ], action=self.replaceKey, ), ConversionRule( [ EquivalentSubkey("model.", "text_model."), Converter_LlamaModel_HF_CS(), ], ), # projector_image_model ConversionRule( [ EquivalentSubkey( "model.mm_projector", "projector_image_model.ffn" ), r"\.\d+", EquivalentSubkey(".", ".linear_layer."), r"(?:weight|bias)", ], action=self.convert_projector, ), # Ignore vision_tower keys if present in LLaVA-LLaMA checkpoint # since we are using separate checkpoints # i.e a pretrained checkpoint for vision_tower # and a separate checkpoint for LLM and projector parts ConversionRule( [ r"model.vision_tower.*", ], exists="left", action=None, ), # projector_text_model if exists ConversionRule( [r"projector_text_model.*"], exists="right", action=None ), ] def convert_projector( self, old_key, new_key, old_state_dict, new_state_dict, from_index, action_fn_args, ): layer_num_old = re.findall("\d+", old_key) layer_num_new = re.findall("\d+", new_key) assert ( len(layer_num_old) == 1 ), f"Cannot have nested Sequential in model" assert ( len(layer_num_new) == 1 ), f"Cannot have nested Sequential in model" if from_index == 0: new_key = new_key.replace( layer_num_new[0], str(int(layer_num_old[0]) // 2) ) else: new_key = new_key.replace( layer_num_new[0], str(int(layer_num_old[0]) * 2) ) new_state_dict[new_key] = old_state_dict[old_key] def pre_checkpoint_convert( self, input_checkpoint, output_checkpoint, configs: Tuple[dict, dict], converter_indices: FormatIndices, ): # Normally this does output_checkpoint["model"] = {} and then we # reference output_checkpoint["model"] later in extract_model_dict. # We don't want to reset the output_checkpoint["model"] here though # because we will store the keys under the same "model" key # created by this function during the component conversion if converter_indices.direction == 0: pass @staticmethod def formats() -> Tuple[FormatVersions, FormatVersions]: return (FormatVersions("hf"), FormatVersions("cs-2.2", "cs-2.3")) @staticmethod def get_config_converter_class() -> BaseConfigConverter: return ConfigConverter_LLaVA_HF_CS22
Converter_LLaVA_CLIPViT_HF_CS22 = Build_HF_CS_Converter_WithOptionalModel( "Converter_LLaVA_CLIPViT_HF_CS22", Converter_LLaVA_CLIPViT_WithoutModel_HF_CS22, derived_class=Converter_LLaVA_CLIPViT_WithoutModel_HF_CS22, ) Converter_LLaVA_LLaMA_HF_CS22 = Build_HF_CS_Converter_WithOptionalModel( "Converter_LLaVA_LLaMA_HF_CS22", Converter_LLaVA_LLaMA_WithoutModel_HF_CS22, derived_class=Converter_LLaVA_LLaMA_WithoutModel_HF_CS22, )
[docs]class Converter_LLaVA_WithoutModel_HF_CS22( BaseCheckpointConverter_UnpackedHF_PackedCS ): def __init__(self): super().__init__() self.rules = [ ConversionRule(["image_model.*"], exists="right", action=None), ConversionRule(["text_model.*"], exists="right", action=None), ConversionRule( ["projector_image_model.*"], exists="right", action=None ), ConversionRule( ["projector_text_model.*"], exists="right", action=None ), ] @staticmethod def converters(): return ( Converter_LLaVA_CLIPViT_HF_CS22, Converter_LLaVA_LLaMA_HF_CS22, ) @staticmethod def component_names(): return ("image_model", "text_model") def post_checkpoint_convert( self, input_checkpoint, output_checkpoint, configs: Tuple[dict, dict], converter_indices: FormatIndices, ): if converter_indices.direction == 0: # HF -> CS # We are converting from HF # to our LLaVA model. We need to create the visual token `projection` # layer and init to default values for phase 1 is_projector_exists = any( [ "projector_image_model" in k for k in output_checkpoint["model"].keys() ] ) if not is_projector_exists: logging.info( f"---- HF checkpoint does not have projector weight, initializing defaults" ) cs_config = configs[1] im_proj_config = cs_config["model"]["projector"]["image_model"] input_unit = im_proj_config["input_unit"] layers_units = im_proj_config["layers_units"] use_bias = im_proj_config["use_bias"] input_ = [input_unit] + layers_units[:-1] output_ = layers_units for i, (inp, out) in enumerate(zip(input_, output_)): scale = math.sqrt(1.0 / inp) projection_weight = torch.zeros(out, inp) projection_weight.uniform_(-scale, scale) output_checkpoint["model"][ f"projector_image_model.ffn.{i}.linear_layer.weight" ] = projection_weight if use_bias: projection_bias = torch.zeros(out) projection_bias.uniform_(-scale, scale) output_checkpoint["model"][ f"projector_image_model.ffn.{i}.linear_layer.bias" ] = projection_bias super().post_checkpoint_convert( input_checkpoint, output_checkpoint, configs, converter_indices, ) @staticmethod def formats() -> Tuple[FormatVersions, FormatVersions]: return (FormatVersions("hf"), FormatVersions("cs-2.2", "cs-2.3")) @staticmethod def architectures() -> Tuple[List[str], str]: return ( ( "ViTModel", "LlamaModel", ), "LLaVAModel", ) @staticmethod def get_config_converter_class() -> BaseConfigConverter: return ConfigConverter_LLaVA_HF_CS22 @classmethod def converter_note(cls) -> str: note = super().converter_note() return ( note + f"LLaVA convertor for CLIP-ViT and LLaMA backbones " f"for `image_model` and `text_model`. " f"Inorder to use the convertor {cls.formats()[0]} -> {cls.formats()[1]}, " f"the CLIP-ViT checkpoint, config and preprocessor_config should be " f"saved under `image_model` directory and LLaMA checkpoint including tokenizer files " f"should be saved under `text_model` directory. " f"Also, the convertor from {cls.formats()[0]} -> {cls.formats()[1]} " f"expects the `config.json` file for the `text_model` to include LLaVA specific " f"config parameters. The easy way is to download the LLaVA `config.json` and " f"modify the necessary parameters that reflect the LLaMA checkpoint being used." f"Please refer to modelzoo/models/multimodal/llava/README.md " f"for an example setup." )
Converter_LLaVA_HF_CS22 = Build_HF_CS_Converter_WithOptionalModel( "Converter_LLaVA_HF_CS22", Converter_LLaVA_WithoutModel_HF_CS22, derived_class=Converter_LLaVA_WithoutModel_HF_CS22, )
[docs]class ConfigConverter_LLaVA_HF_CS22(BaseConfigConverter_UnpackedHF_PackedCS): # HF preprocessor config preprocessor_config_defaults = { "crop_size": 224, "do_center_crop": True, "do_normalize": True, "do_resize": True, "feature_extractor_type": "CLIPFeatureExtractor", "image_mean": [0.48145466, 0.4578275, 0.40821073], "image_std": [0.26862954, 0.26130258, 0.27577711], "resample": 3, "size": 224, } def __init__(self): super().__init__() self.rules = [] # CS config self.post_convert_defaults[1].update( { "loss_weight": 1.0, "loss_scaling": "num_tokens", "freeze": ['^image_model'], "label_smoothing": 0.0, "z_loss_eps": 0.0, "image_start_idx": 1, "image_feature_select_mode": "patch", } ) @classmethod def converter_note(cls) -> str: return ( f"LLaVA convertor for CLIP-ViT and LLaMA backbones " f"for `image_model` and `text_model`. " f"Inorder to use the convertor {cls.formats()[0]} -> {cls.formats()[1]}, " f"the CLIP-ViT checkpoint, config and preprocessor_config should be " f"saved under `image_model` directory and LLaMA checkpoint including tokenizer files " f"should be saved under `text_model` directory. " f"Also, the convertor from {cls.formats()[0]} -> {cls.formats()[1]} " f"expects the `config.json` file for the `text_model` to include LLaVA specific " f"config parameters. The easy way is to download the LLaVA `config.json` and " f"modify the necessary parameters that reflect the LLaMA checkpoint being used." f"Please refer to modelzoo/models/multimodal/llava/README.md " f"for an example setup." ) @classmethod def save( cls, file_without_ext: str, config: OrderedDict, converter_indices: FormatIndices, **kwargs, ) -> str: # saving CS requires only saving once if converter_indices.direction == 0: return super().save( file_without_ext, config, converter_indices, **kwargs ) # saving HF requires separating encoders and saving both else: save_files = [] dir = os.path.dirname(file_without_ext) for i, name in enumerate(cls.component_names()): path = os.path.join(dir, name, "config") if not os.path.exists(os.path.join(dir, name)): os.mkdir(os.path.join(dir, name)) if name == "text_model": # add path to folder containing # image model in text_model config config[i]["mm_vision_tower"] = os.path.dirname( save_files[0] ) if name == "image_model": preprocess_path = path.replace( "config", "preprocessor_config" ) # Save preprocessor config after the dir is created BaseConfigConverter_HF_CS.save( preprocess_path, cls.preprocessor_config_defaults, converter_indices, **kwargs, ) save_file = BaseConfigConverter_HF_CS.save( path, config[i], converter_indices, **kwargs ) save_files.append(save_file) return save_files
[docs] def post_config_convert( self, original_config, old_config, new_config, converter_indices, drop_unmatched_keys, ): """ new_config: List[Dict] if converter_indices = 1 (CS -> HF) else Dict if converter_indices = 0 (HF -> CS) """ new_config = super().post_config_convert( original_config, old_config, new_config, converter_indices, drop_unmatched_keys, ) if converter_indices.direction == 0: # src_fmt:HF -> tgt_fmt:CS # old_config = List[configs] where index i # corresponds to ith entry in component_names new_image_config = new_config["model"]["image_model"] new_image_config["name"] = "ViTModel" # remove non-kwargs new_image_config.pop("mixed_precision") new_image_config.pop("num_classes") new_image_config.pop("use_bias_in_output") new_text_config = new_config["model"]["text_model"] new_text_config["name"] = "LlamaModel" new_text_config.pop("mixed_precision") new_config["model"]["image_feature_select_mode"] = ( new_text_config.pop("image_feature_select_mode") ) # We are doing this to get "projector_image_model" under "projector" key in CS yaml # Convert `mm_projector_type` here since we depend on other values in the config mm_hidden_size = new_text_config.pop("mm_hidden_size") assert ( mm_hidden_size == new_image_config["hidden_size"] ), f"`mm_hidden_size should be same as the hidden_dim of mm_vision_tower" new_projector_config = new_text_config.pop("projector") hf_projector_type = new_projector_config.pop("hf_type") num_linear = int( re.match("mlp(\d+)x_gelu", hf_projector_type).group(1) ) new_im_proj_config = new_projector_config["image_model"] new_im_proj_config["name"] = "FeedForwardNetwork" new_im_proj_config["input_unit"] = mm_hidden_size # image_model new_im_proj_config["layers_units"] = [ new_text_config["hidden_size"] ] * num_linear # text_model # we write `gelu` here since the input HF config # has `mlp2x_gelu` and LLaVA hardcodes `gelu` new_im_proj_config["layers_activation"] = ["gelu"] * ( num_linear - 1 ) + [None] new_im_proj_config["use_bias"] = True new_config["model"]["projector"] = new_projector_config # Add other params at `model` level for CS new_config["model"]["freeze"] = new_text_config.pop("freeze") new_config["model"]["image_feature_select_layer_idx"] = ( new_text_config.pop("image_feature_select_layer_idx") ) new_config["model"]["image_model"].pop("fp16_type", None) else: # CS -> HF # new_config is the HF config = List[configs] where index i # corresponds to ith entry in component_names: # LLaVA model init on HF works only when # there is a preprocessor config self.preprocessor_config_defaults.update( { "crop_size": { "height": old_config["image_model"]["image_size"][0], "width": old_config["image_model"]["image_size"][1], }, "size": old_config["image_model"]["image_size"][0], } ) return new_config
@staticmethod def formats() -> Tuple[FormatVersions, FormatVersions]: return (FormatVersions("hf"), FormatVersions("cs-2.2", "cs-2.3")) @staticmethod def converters(): return ( ConfigConverter_CLIPViT_HF_CS21, ConfigConverter_LLaMaProjector_HF_CS22, ) @staticmethod def component_names(): return ( "image_model", "text_model", )
[docs] def pre_config_convert( self, config, converter_indices, ): """ config: List[dicts] if converter_indices = 0 (HF-> CS) else dict (CS->HF) """ if converter_indices.direction == 0: # HF -> CS # To avoid asserts with BaseConfigConverter.assert_factory_fn config[0]["model_type"] = "clip_vision_model" config[1]["model_type"] = "llama" if "vision_config" in config[0]: config[0] = config[0]["vision_config"] else: # CS -> HF # Move projector config into text_model config # for CS inorder to match keys projector_config = config["model"].pop("projector") config["model"]["text_model"]["projector"] = projector_config config["model"]["text_model"]["freeze"] = config["model"].pop( "freeze" ) config["model"]["text_model"]["image_feature_select_layer_idx"] = ( config["model"].pop("image_feature_select_layer_idx") ) config["model"]["text_model"]["image_feature_select_mode"] = config[ "model" ].pop("image_feature_select_mode") config["model"]["text_model"]["mm_hidden_size"] = config["model"][ "image_model" ]["hidden_size"] return super().pre_config_convert(config, converter_indices)
[docs]class ConfigConverter_LLaMaProjector_HF_CS22(ConfigConverter_LLaMa_HF_CS21): def __init__(self): super().__init__() projector_rules = [ ConversionRule( [EquivalentSubkey("mm_projector_type", "projector")], action=self.convert_mm_projector_type, ), ConversionRule( ["mm_hidden_size"], action=self.replaceKey, ), ConversionRule( [ EquivalentSubkey( "mm_vision_select_feature", "image_feature_select_mode" ) ], action=self.replaceKey, ), ConversionRule( [ EquivalentSubkey( "mm_vision_select_layer", "image_feature_select_layer_idx", ) ], action=self.convert_mm_vision_select_feature, ), ConversionRule( ["mm_use_im_start_end"], exists="left", action=BaseConfigConverter.assert_factory_fn(0, False), ), ConversionRule( ["mm_use_im_patch_token"], exists="left", action=BaseConfigConverter.assert_factory_fn(0, False), ), ConversionRule( [EquivalentSubkey("tune_mm_mlp_adapter", "freeze")], action=self.convert_tune_mm_mlp_adapter, ), ] self.rules = self.rules + projector_rules # HF self.pre_convert_defaults[0].update( { "mm_vision_select_feature": "patch", "mm_use_im_patch_token": False, "mm_use_im_start_end": False, "tie_word_embeddings": False, "rope_scaling": None, "unfreeze_mm_vision_tower": False, "tune_mm_vision_resampler": False, "tune_mm_mlp_adapter": False, "mm_vision_select_layer": -2, "mm_projector_type": "mlp2x_gelu", "mm_hidden_size": 64, } ) # CS # text model self.pre_convert_defaults[1].update( { "share_embedding_weights": False, "use_bias_in_output": False, } ) # HF self.post_convert_defaults[0].update( { "mm_use_im_patch_token": False, "mm_use_im_start_end": False, "bos_token_id": 1, "eos_token_id": 2, "image_aspect_ratio": "pad", "freeze_mm_mlp_adapter": False, "freeze_mm_vision_resampler": False, "model_type": "llava", "architectures": ["LlavaLlamaForCausalLM"], "pad_token_id": 0, "tune_mm_mlp_adapter": False, "tune_mm_vision_resampler": False, "unfreeze_mm_vision_tower": False, "use_cache": True, } ) def convert_mm_vision_select_feature( self, old_key, new_key, old_state_dict, new_state_dict, from_index, action_fn_args, ): old_val = old_state_dict[old_key] if old_val < 0: new_state_dict[new_key] = old_val else: if from_index == 0: # HF -> CS # When HF outputs hidden states, it also includes embeddings # https://github.com/huggingface/transformers/blob/v4.38.1/src/transformers/models/clip/modeling_clip.py#L79 # Also, LLava directly uses this value # https://github.com/haotian-liu/LLaVA/blob/main/llava/model/multimodal_encoder/clip_encoder.py#L36 assert old_val != 0, f" value = 0 will get embeddings" new_state_dict[new_key] = old_val - 1 else: new_state_dict[new_key] = old_val + 1 def convert_mm_projector_type( self, old_key, new_key, old_state_dict, new_state_dict, from_index, action_fn_args, ): if from_index == 0: # HF -> CS assert ( re.match("mlp(\d+)x_gelu", old_state_dict["mm_projector_type"]) is not None ), "Convertor only valid when `mm_projector_type` is of pattern `mlp(\d+)x_gelu`, got {}".format( old_state_dict["mm_projector_type"] ) # old_state_dict would be list with index i # corresponding to component i in # `ConfigConverter_LLaVA_HF_CS21` new_state_dict[new_key] = {} new_state_dict[new_key]["image_model"] = {} new_state_dict[new_key]["hf_type"] = old_state_dict[old_key] else: # CS-> HF assert ( len(old_state_dict[old_key]["image_model"]) != 0 ), f"CS model should have non-empty `projector.image_model`" proj_name = old_state_dict[old_key]["image_model"]["name"] _msg = ( f"CS model projector.image_model.name should be of type " f"`FeedForwardNetwork inorder to convert to HF, got {proj_name}" ) assert proj_name == "FeedForwardNetwork", _msg act = old_state_dict[old_key]["image_model"]["layers_activation"] expected_act = ["gelu"] * (len(act) - 1) + [None] assert ( act == expected_act ), f"Cannot support {act}, expected value = {expected_act}" new_state_dict[new_key] = "mlp{}x_gelu".format(len(act)) def convert_tune_mm_mlp_adapter( self, old_key, new_key, old_state_dict, new_state_dict, from_index, action_fn_args, ): if from_index == 0: # HF -> CS # Freeze modules appropriately old_key_val = old_state_dict[old_key] new_state_dict[new_key] = ['^image_model'] if old_key_val: new_state_dict[new_key].append('^text_model') else: # HF: `tune_mm_mlp_adapter`: True -> CS `freeze`: ["image_model", "text_model"] # HF: `tune_mm_mlp_adapter`: False -> CS `freeze`: ["image_model"] old_val = old_state_dict[old_key] if "text_model" in old_val: new_state_dict[new_key] = True else: new_state_dict[new_key] = False @staticmethod def formats() -> Tuple[FormatVersions, FormatVersions]: return ( FormatVersions("hf"), FormatVersions("cs-2.2", "cs-2.3"), ) def post_config_convert( self, original_config, old_config, new_config, converter_indices, drop_unmatched_keys, ): return super().post_config_convert( original_config, old_config, new_config, converter_indices, drop_unmatched_keys, )