CandidateExplorer / services /base /BaseGenerator.py
ishaq101's picture
[NOTICKET] Fix langfuse version, testing langfuse
e22b3b4
from pydantic import BaseModel
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import AzureChatOpenAI
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from typing import Dict
# ❌ REMOVED: from externals.observability.langfuse import langfuse_handler, langfuse
from services.llms.LLM import model_5mini, model_4omini
from utils.decorator import trace_runtime
from utils.logger import get_logger
logger = get_logger("base generator")
class MetadataObservability(BaseModel):
fullname: str
task_id: str
agent: str
class BaseAIGenerator:
def __init__(self,
task_name: str,
prompt: ChatPromptTemplate,
input_llm: Dict,
metadata_observability: MetadataObservability,
llm: AzureChatOpenAI = model_5mini | model_4omini,
):
self.name = task_name
self.llm = llm
self.prompt = prompt
self.input_llm = input_llm
self.metadata_observability = metadata_observability
def _get_langfuse_handler(self):
try:
import os
from config.constant import LangfuseConstants # adjust import path if needed
os.environ["LANGFUSE_PUBLIC_KEY"] = LangfuseConstants.PUBLIC_KEY
os.environ["LANGFUSE_SECRET_KEY"] = LangfuseConstants.SECRET_KEY
os.environ["LANGFUSE_HOST"] = LangfuseConstants.HOST or "https://us.cloud.langfuse.com"
from langfuse.langchain import CallbackHandler
return CallbackHandler()
except Exception as e:
logger.warning(f"⚠️ Langfuse unavailable, skipping observability: {e}")
return None
@retry(
reraise=True,
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=5),
retry=retry_if_exception_type(Exception)
)
async def _asafe_invoke(self, chain, input_llm, config):
return await chain.ainvoke(input_llm, config=config)
@retry(
reraise=True,
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=5),
retry=retry_if_exception_type(Exception)
)
async def _safe_invoke(self, chain, input_llm, config):
return chain.invoke(input_llm, config=config)
@trace_runtime
async def agenerate(self):
try:
handler = self._get_langfuse_handler()
config = {"callbacks": [handler]} if handler else {}
chain = self.prompt | self.llm
output = await self._asafe_invoke(
chain=chain,
input_llm=self.input_llm,
config=config,
)
return output
except Exception as e:
logger.exception("❌ BaseGenerator agenerate error")
return None
@trace_runtime
async def generate(self):
try:
handler = self._get_langfuse_handler()
config = {"callbacks": [handler]} if handler else {}
chain = self.prompt | self.llm
output = self._safe_invoke(
chain=chain,
input_llm=self.input_llm,
config=config,
)
return output
except Exception as e:
logger.exception("❌ BaseGenerator generate error")
return None