| import transformers |
| from transformers import AutoModelForCausalLM, AutoProcessor, AutoConfig, QuantoConfig, GenerationConfig |
| import torch |
| import safetensors |
| import argparse |
| import os |
| import json |
| from PIL import Image |
|
|
| """ |
| usage: |
| export SAFETENSORS_FAST_GPU=1 |
| python main.py --quant_type int8 --world_size 8 --model_id <model_path> --image_path <image_path> |
| """ |
|
|
| def generate_quanto_config(hf_config: AutoConfig, quant_type: str): |
| QUANT_TYPE_MAP = { |
| "default": None, |
| "int8": QuantoConfig( |
| weights="int8", |
| modules_to_not_convert=[ |
| "vision_tower", |
| "image_newline", |
| "multi_modal_projector", |
| "lm_head", |
| "embed_tokens", |
| ] + [f"model.layers.{i}.coefficient" for i in range(hf_config.text_config.num_hidden_layers)] |
| + [f"model.layers.{i}.block_sparse_moe.gate" for i in range(hf_config.text_config.num_hidden_layers)] |
| ), |
| } |
| return QUANT_TYPE_MAP[quant_type] |
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--quant_type", type=str, default="default", choices=["default", "int8"]) |
| parser.add_argument("--model_id", type=str, required=True) |
| parser.add_argument("--world_size", type=int, required=True) |
| parser.add_argument("--image_path", type=str, required=True) |
| return parser.parse_args() |
|
|
| def check_params(args, hf_config: AutoConfig): |
| if args.quant_type == "int8": |
| assert args.world_size >= 8, "int8 weight-only quantization requires at least 8 GPUs" |
|
|
| assert hf_config.text_config.num_hidden_layers % args.world_size == 0, f"num_hidden_layers({hf_config.text_config.num_hidden_layers}) must be divisible by world_size({args.world_size})" |
|
|
| @torch.no_grad() |
| def main(): |
| args = parse_args() |
| print("\n=============== Argument ===============") |
| for key in vars(args): |
| print(f"{key}: {vars(args)[key]}") |
| print("========================================") |
|
|
| model_id = args.model_id |
|
|
| hf_config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) |
| processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) |
| quantization_config = generate_quanto_config(hf_config, args.quant_type) |
|
|
| check_params(args, hf_config) |
|
|
| model_safetensors_index_path = os.path.join(model_id, "model.safetensors.index.json") |
| with open(model_safetensors_index_path, "r") as f: |
| model_safetensors_index = json.load(f) |
| weight_map = model_safetensors_index['weight_map'] |
| vision_map = {} |
| for key, value in weight_map.items(): |
| if 'vision_tower' in key or 'image_newline' in key or 'multi_modal_projector' in key: |
| new_key = key.replace('.weight','').replace('.bias','') |
| if new_key not in vision_map: |
| vision_map[new_key] = value |
| device_map = { |
| 'language_model.model.embed_tokens': 'cuda:0', |
| 'language_model.model.norm': f'cuda:{args.world_size - 1}', |
| 'language_model.lm_head': f'cuda:{args.world_size - 1}' |
| } |
| for key, value in vision_map.items(): |
| device_map[key] = f'cuda:0' |
| device_map['vision_tower.vision_model.post_layernorm'] = f'cuda:0' |
| layers_per_device = hf_config.text_config.num_hidden_layers // args.world_size |
| for i in range(args.world_size): |
| for j in range(layers_per_device): |
| device_map[f'language_model.model.layers.{i * layers_per_device + j}'] = f'cuda:{i}' |
|
|
| messages = [ |
| {"role": "system", "content": [{"type": "text", "text": "You are a helpful assistant created by Minimax based on MiniMax-VL-01 model."}]}, |
| {"role": "user", "content": [{"type": "image", "image": "placeholder"},{"type": "text", "text": "Describe this image."}]}, |
| ] |
| prompt = processor.tokenizer.apply_chat_template( |
| messages, tokenize=False, add_generation_prompt=True |
| ) |
| print(f"prompt: \n{prompt}") |
| raw_image = Image.open(args.image_path) |
| model_inputs = processor(images=[raw_image], text=prompt, return_tensors='pt').to('cuda').to(torch.bfloat16) |
|
|
| quantized_model = AutoModelForCausalLM.from_pretrained( |
| model_id, |
| torch_dtype="bfloat16", |
| device_map=device_map, |
| quantization_config=quantization_config, |
| trust_remote_code=True, |
| offload_buffers=True, |
| ) |
| generation_config = GenerationConfig( |
| max_new_tokens=100, |
| eos_token_id=200020, |
| use_cache=True, |
| ) |
| generated_ids = quantized_model.generate(**model_inputs, generation_config=generation_config) |
| print(f"generated_ids: {generated_ids}") |
| generated_ids = [ |
| output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
| ] |
| response = processor.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
| print(response) |
| |
|
|
| def query_safetensors(path): |
| safetensor = safetensors.torch.load_file(path) |
| for key in safetensor.keys(): |
| print(key, safetensor[key].shape) |
| if __name__ == "__main__": |
| main() |