Spaces:
Runtime error
Runtime error
| # generation_utils.py | |
| from threading import Thread | |
| from time import perf_counter | |
| from typing import List | |
| import gradio as gr | |
| from transformers import AutoTokenizer, TextIteratorStreamer | |
| import numpy as np | |
| import os | |
| def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int: | |
| """ | |
| Gets the token ID for a given string that has been added to the tokenizer as a special token. | |
| Args: | |
| tokenizer (PreTrainedTokenizer): the tokenizer | |
| key (str): the key to convert to a single token | |
| Raises: | |
| ValueError: if more than one ID was generated | |
| Returns: | |
| int: the token ID for the given key | |
| """ | |
| token_ids = tokenizer.encode(key) | |
| if len(token_ids) > 1: | |
| raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}") | |
| return token_ids[0] | |
| def estimate_latency( | |
| current_time: float, | |
| current_perf_text: str, | |
| new_gen_text: str, | |
| per_token_time: List[float], | |
| num_tokens: int, | |
| ) -> tuple: | |
| """ | |
| Helper function for performance estimation | |
| Parameters: | |
| current_time (float): This step time in seconds. | |
| current_perf_text (str): Current content of performance UI field. | |
| new_gen_text (str): New generated text. | |
| per_token_time (List[float]): history of performance from previous steps. | |
| num_tokens (int): Total number of generated tokens. | |
| Returns: | |
| update for performance text field | |
| update for a total number of tokens | |
| """ | |
| num_current_toks = len(tokenizer.encode(new_gen_text)) | |
| num_tokens += num_current_toks | |
| per_token_time.append(num_current_toks / current_time) | |
| if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: | |
| current_bucket = per_token_time[:-10] | |
| return ( | |
| f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", | |
| num_tokens, | |
| ) | |
| return current_perf_text, num_tokens | |
| def run_generation( | |
| user_text: str, | |
| top_p: float, | |
| temperature: float, | |
| top_k: int, | |
| max_new_tokens: int, | |
| perf_text: str, | |
| tokenizer: AutoTokenizer, | |
| tokenizer_kwargs: dict, | |
| model_configuration: dict, | |
| ov_model, | |
| ) -> tuple: | |
| """ | |
| Text generation function | |
| Parameters: | |
| user_text (str): User-provided instruction for generation. | |
| top_p (float): Nucleus sampling. If < 1, keeps smallest set of most probable tokens. | |
| temperature (float): Modulates logits distribution. | |
| top_k (int): Number of highest probability vocabulary tokens to keep for top-k-filtering. | |
| max_new_tokens (int): Maximum length of generated sequence. | |
| perf_text (str): Content of text field for performance results. | |
| tokenizer (AutoTokenizer): The tokenizer object. | |
| tokenizer_kwargs (dict): Additional kwargs for tokenizer. | |
| model_configuration (dict): Configuration for the model. | |
| ov_model: Your OpenVINO model object. | |
| Returns: | |
| model_output (str): Model-generated text. | |
| perf_text (str): Updated performance text. | |
| """ | |
| # Extract necessary configurations from model_configuration | |
| response_key = model_configuration.get("response_key") | |
| prompt_template = model_configuration.get("prompt_template", "{instruction}") | |
| end_key = model_configuration.get("end_key") | |
| end_key_token_id = None | |
| # Handle special tokens | |
| if response_key: | |
| tokenizer_response_key = next( | |
| (token for token in tokenizer.additional_special_tokens if token.startswith(response_key)), | |
| None, | |
| ) | |
| if tokenizer_response_key and end_key: | |
| try: | |
| end_key_token_id = get_special_token_id(tokenizer, end_key) | |
| except ValueError: | |
| pass | |
| # Ensure defaults for token IDs | |
| end_key_token_id = end_key_token_id or tokenizer.eos_token_id | |
| pad_token_id = end_key_token_id or tokenizer.pad_token_id | |
| # Prepare input prompt according to model expected template | |
| prompt_text = prompt_template.format(instruction=user_text) | |
| # Tokenize the user text. | |
| model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs) | |
| # Start generation on a separate thread, so that we don't block the UI. | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| generate_kwargs = { | |
| **model_inputs, | |
| "streamer": streamer, | |
| "max_new_tokens": max_new_tokens, | |
| "do_sample": True, | |
| "top_p": top_p, | |
| "temperature": float(temperature), | |
| "top_k": top_k, | |
| "eos_token_id": end_key_token_id, | |
| "pad_token_id": pad_token_id, | |
| } | |
| # Start generation in a separate thread | |
| t = Thread(target=ov_model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| # Pull the generated text from the streamer and update model output | |
| model_output = "" | |
| per_token_time = [] | |
| num_tokens = 0 | |
| start = perf_counter() | |
| for new_text in streamer: | |
| current_time = perf_counter() - start | |
| model_output += new_text | |
| perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens) | |
| yield model_output, perf_text | |
| start = perf_counter() | |
| return model_output, perf_text | |
| def estimate_latency( | |
| current_time: float, | |
| current_perf_text: str, | |
| new_gen_text: str, | |
| per_token_time: List[float], | |
| num_tokens: int, | |
| ): | |
| """ | |
| Helper function for performance estimation | |
| Parameters: | |
| current_time (float): This step time in seconds. | |
| current_perf_text (str): Current content of performance UI field. | |
| new_gen_text (str): New generated text. | |
| per_token_time (List[float]): history of performance from previous steps. | |
| num_tokens (int): Total number of generated tokens. | |
| Returns: | |
| update for performance text field | |
| update for a total number of tokens | |
| """ | |
| num_current_toks = len(tokenizer.encode(new_gen_text)) | |
| num_tokens += num_current_toks | |
| per_token_time.append(num_current_toks / current_time) | |
| if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: | |
| current_bucket = per_token_time[:-10] | |
| return ( | |
| f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", | |
| num_tokens, | |
| ) | |
| return current_perf_text, num_tokens | |
| def reset_textbox(instruction: str, response: str, perf: str): | |
| """ | |
| Helper function for resetting content of all text fields | |
| Parameters: | |
| instruction (str): Content of user instruction field. | |
| response (str): Content of model response field. | |
| perf (str): Content of performance info filed | |
| Returns: | |
| empty string for each placeholder | |
| """ | |
| return "", "", "" |