Spaces:
Running
Running
| from pydantic import BaseModel | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_openai import AzureChatOpenAI | |
| from tenacity import ( | |
| retry, | |
| stop_after_attempt, | |
| wait_exponential, | |
| retry_if_exception_type | |
| ) | |
| from typing import Dict | |
| # ❌ REMOVED: from externals.observability.langfuse import langfuse_handler, langfuse | |
| from services.llms.LLM import model_5mini, model_4omini | |
| from utils.decorator import trace_runtime | |
| from utils.logger import get_logger | |
| logger = get_logger("base generator") | |
| class MetadataObservability(BaseModel): | |
| fullname: str | |
| task_id: str | |
| agent: str | |
| class BaseAIGenerator: | |
| def __init__(self, | |
| task_name: str, | |
| prompt: ChatPromptTemplate, | |
| input_llm: Dict, | |
| metadata_observability: MetadataObservability, | |
| llm: AzureChatOpenAI = model_5mini | model_4omini, | |
| ): | |
| self.name = task_name | |
| self.llm = llm | |
| self.prompt = prompt | |
| self.input_llm = input_llm | |
| self.metadata_observability = metadata_observability | |
| def _get_langfuse_handler(self): | |
| try: | |
| import os | |
| from config.constant import LangfuseConstants # adjust import path if needed | |
| os.environ["LANGFUSE_PUBLIC_KEY"] = LangfuseConstants.PUBLIC_KEY | |
| os.environ["LANGFUSE_SECRET_KEY"] = LangfuseConstants.SECRET_KEY | |
| os.environ["LANGFUSE_HOST"] = LangfuseConstants.HOST or "https://us.cloud.langfuse.com" | |
| from langfuse.langchain import CallbackHandler | |
| return CallbackHandler() | |
| except Exception as e: | |
| logger.warning(f"⚠️ Langfuse unavailable, skipping observability: {e}") | |
| return None | |
| async def _asafe_invoke(self, chain, input_llm, config): | |
| return await chain.ainvoke(input_llm, config=config) | |
| async def _safe_invoke(self, chain, input_llm, config): | |
| return chain.invoke(input_llm, config=config) | |
| async def agenerate(self): | |
| try: | |
| handler = self._get_langfuse_handler() | |
| config = {"callbacks": [handler]} if handler else {} | |
| chain = self.prompt | self.llm | |
| output = await self._asafe_invoke( | |
| chain=chain, | |
| input_llm=self.input_llm, | |
| config=config, | |
| ) | |
| return output | |
| except Exception as e: | |
| logger.exception("❌ BaseGenerator agenerate error") | |
| return None | |
| async def generate(self): | |
| try: | |
| handler = self._get_langfuse_handler() | |
| config = {"callbacks": [handler]} if handler else {} | |
| chain = self.prompt | self.llm | |
| output = self._safe_invoke( | |
| chain=chain, | |
| input_llm=self.input_llm, | |
| config=config, | |
| ) | |
| return output | |
| except Exception as e: | |
| logger.exception("❌ BaseGenerator generate error") | |
| return None |