from pydantic import BaseModel from langchain_core.prompts import ChatPromptTemplate from langchain_openai import AzureChatOpenAI from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type ) from typing import Dict # ❌ REMOVED: from externals.observability.langfuse import langfuse_handler, langfuse from services.llms.LLM import model_5mini, model_4omini from utils.decorator import trace_runtime from utils.logger import get_logger logger = get_logger("base generator") class MetadataObservability(BaseModel): fullname: str task_id: str agent: str class BaseAIGenerator: def __init__(self, task_name: str, prompt: ChatPromptTemplate, input_llm: Dict, metadata_observability: MetadataObservability, llm: AzureChatOpenAI = model_5mini | model_4omini, ): self.name = task_name self.llm = llm self.prompt = prompt self.input_llm = input_llm self.metadata_observability = metadata_observability def _get_langfuse_handler(self): try: import os from config.constant import LangfuseConstants # adjust import path if needed os.environ["LANGFUSE_PUBLIC_KEY"] = LangfuseConstants.PUBLIC_KEY os.environ["LANGFUSE_SECRET_KEY"] = LangfuseConstants.SECRET_KEY os.environ["LANGFUSE_HOST"] = LangfuseConstants.HOST or "https://us.cloud.langfuse.com" from langfuse.langchain import CallbackHandler return CallbackHandler() except Exception as e: logger.warning(f"⚠️ Langfuse unavailable, skipping observability: {e}") return None @retry( reraise=True, stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=5), retry=retry_if_exception_type(Exception) ) async def _asafe_invoke(self, chain, input_llm, config): return await chain.ainvoke(input_llm, config=config) @retry( reraise=True, stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=5), retry=retry_if_exception_type(Exception) ) async def _safe_invoke(self, chain, input_llm, config): return chain.invoke(input_llm, config=config) @trace_runtime async def agenerate(self): try: handler = self._get_langfuse_handler() config = {"callbacks": [handler]} if handler else {} chain = self.prompt | self.llm output = await self._asafe_invoke( chain=chain, input_llm=self.input_llm, config=config, ) return output except Exception as e: logger.exception("❌ BaseGenerator agenerate error") return None @trace_runtime async def generate(self): try: handler = self._get_langfuse_handler() config = {"callbacks": [handler]} if handler else {} chain = self.prompt | self.llm output = self._safe_invoke( chain=chain, input_llm=self.input_llm, config=config, ) return output except Exception as e: logger.exception("❌ BaseGenerator generate error") return None