MudabbirAI / mcp_servers.py
youssefleb's picture
Update mcp_servers.py
a4f27db verified
raw
history blame
13.1 kB
# mcp_servers.py (FIXED: Schema Enforcement + Detailed Logging + Usage Tracking)
import asyncio
import json
import re
import google.generativeai as genai
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
from typing import Dict, Optional, Tuple, List, Any
import config
from utils import load_prompt
from personas import PERSONAS_DATA
EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"])
# --- DEFINING THE SCHEMA TO FORCE JUSTIFICATIONS ---
EVALUATION_SCHEMA = {
"type": "OBJECT",
"properties": {
"Novelty": {
"type": "OBJECT",
"properties": {
"score": {"type": "INTEGER"},
"justification": {"type": "STRING"}
},
"required": ["score", "justification"]
},
"Usefulness_Feasibility": {
"type": "OBJECT",
"properties": {
"score": {"type": "INTEGER"},
"justification": {"type": "STRING"}
},
"required": ["score", "justification"]
},
"Flexibility": {
"type": "OBJECT",
"properties": {
"score": {"type": "INTEGER"},
"justification": {"type": "STRING"}
},
"required": ["score", "justification"]
},
"Elaboration": {
"type": "OBJECT",
"properties": {
"score": {"type": "INTEGER"},
"justification": {"type": "STRING"}
},
"required": ["score", "justification"]
},
"Cultural_Appropriateness": {
"type": "OBJECT",
"properties": {
"score": {"type": "INTEGER"},
"justification": {"type": "STRING"}
},
"required": ["score", "justification"]
}
},
"required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"]
}
def extract_json(text: str) -> dict:
"""Robustly extracts JSON from text."""
try:
clean_text = text.strip()
if "```json" in clean_text:
clean_text = clean_text.split("```json")[1].split("```")[0].strip()
elif "```" in clean_text:
clean_text = clean_text.split("```")[1].split("```")[0].strip()
return json.loads(clean_text)
except (json.JSONDecodeError, IndexError):
try:
match = re.search(r'(\{[\s\S]*\})', text)
if match:
return json.loads(match.group(1))
except:
pass
raise ValueError(f"Could not extract JSON from response: {text[:100]}...")
class BusinessSolutionEvaluator:
def __init__(self, gemini_client: Optional[genai.GenerativeModel]):
if not gemini_client:
raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.")
self.gemini_model = gemini_client
if "ERROR:" in EVALUATION_PROMPT_TEMPLATE:
raise FileNotFoundError(EVALUATION_PROMPT_TEMPLATE)
async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]:
"""Returns (evaluation_dict, usage_dict)"""
print(f"Evaluating solution (live): {solution_text[:50]}...")
base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text)
schema_instruction = """
[IMPORTANT SYSTEM INSTRUCTION]
Ignore any previous examples of JSON formatting in this prompt.
You MUST strictly follow the Output Schema provided below.
For EACH of the 5 metrics (Novelty, Usefulness_Feasibility, etc.), you must provide an object with TWO fields:
1. "score": An integer from 1 to 5.
2. "justification": A specific sentence explaining why you gave that score.
Do not output a list. Return a single JSON object describing the solution above.
"""
final_prompt = base_prompt + schema_instruction
usage = {"model": "Gemini", "input": 0, "output": 0}
try:
response = await self.gemini_model.generate_content_async(
final_prompt,
generation_config=genai.types.GenerationConfig(
response_mime_type="application/json",
response_schema=EVALUATION_SCHEMA
)
)
# Capture Usage
if hasattr(response, "usage_metadata"):
usage["input"] = response.usage_metadata.prompt_token_count
usage["output"] = response.usage_metadata.candidates_token_count
v_fitness = extract_json(response.text)
if not isinstance(v_fitness, (dict, list)):
raise ValueError(f"Judge returned invalid type: {type(v_fitness)}")
print(f"Evaluation complete (live): {v_fitness}")
return v_fitness, usage
except Exception as e:
print(f"ERROR: BusinessSolutionEvaluator failed: {e}")
return {
"Novelty": {"score": 1, "justification": f"Error: {str(e)}"},
"Usefulness_Feasibility": {"score": 1, "justification": f"Error: {str(e)}"},
"Flexibility": {"score": 1, "justification": f"Error: {str(e)}"},
"Elaboration": {"score": 1, "justification": f"Error: {str(e)}"},
"Cultural_Appropriateness": {"score": 1, "justification": f"Error: {str(e)}"}
}, usage
class AgentCalibrator:
def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator):
self.evaluator = evaluator
self.api_clients = {name: client for name, client in api_clients.items() if client}
self.sponsor_llms = list(self.api_clients.keys())
print(f"AgentCalibrator initialized with enabled clients: {self.sponsor_llms}")
async def calibrate_team(self, problem: str) -> Tuple[Dict[str, Any], List[str], List[Dict[str, Any]], List[Dict[str, Any]]]:
print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...")
error_log = []
detailed_results = []
all_usage_stats = [] # Collect all usage data here
if not self.sponsor_llms:
raise Exception("AgentCalibrator cannot run: No LLM clients are configured.")
if len(self.sponsor_llms) == 1:
default_llm = self.sponsor_llms[0]
print("Only one LLM available. Skipping calibration.")
plan = {
"Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": default_llm},
"Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm},
"Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm}
}
return plan, error_log, [], []
roles_to_test = {
role: PERSONAS_DATA[key]["description"]
for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()
}
test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution."
tasks = []
for role, persona in roles_to_test.items():
for llm_name in self.sponsor_llms:
tasks.append(self.run_calibration_test(problem, role, llm_name, persona, test_problem))
results = await asyncio.gather(*tasks)
detailed_results = results
# Flatten results to extract usage
for res in results:
if "usage_gen" in res: all_usage_stats.append(res["usage_gen"])
if "usage_eval" in res: all_usage_stats.append(res["usage_eval"])
best_llms = {}
role_metrics = config.CALIBRATION_CONFIG["role_metrics"]
for role in roles_to_test.keys():
best_score = -1
best_llm = self.sponsor_llms[0]
for res in results:
if res["role"] == role:
if res.get("error"):
error_log.append(f"Calibration failed for {res['llm']} (as {role}): {res['error']}")
continue
metric = role_metrics[role]
# Robust Dict Access
raw_score_data = res.get("score", {})
if not isinstance(raw_score_data, (dict, list)): raw_score_data = {}
if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {}
metric_data = raw_score_data.get(metric, {})
if not isinstance(metric_data, (dict, list)): metric_data = {}
if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {}
score = metric_data.get("score", 0)
if score > best_score:
best_score = score
best_llm = res["llm"]
best_llms[role] = best_llm
team_plan = {
"Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": best_llms["Plant"]},
"Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": best_llms["Implementer"]},
"Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]}
}
print(f"Calibration complete (live). Team plan: {team_plan}")
return team_plan, error_log, detailed_results, all_usage_stats
async def run_calibration_test(self, problem, role, llm_name, persona, test_problem):
print(f"...Calibrating {role} on {llm_name}...")
client = self.api_clients[llm_name]
# 1. Generate Solution (and get usage)
solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem)
if "Error generating response" in solution:
return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage}
# 2. Evaluate Solution (and get usage)
score, eval_usage = await self.evaluator.evaluate(problem, solution)
return {
"role": role,
"llm": llm_name,
"score": score,
"output": solution,
"usage_gen": gen_usage,
"usage_eval": eval_usage
}
# --- Unified API Call Function ---
async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]:
"""Returns (text_response, usage_dict)"""
usage = {"model": client_name, "input": 0, "output": 0}
try:
if client_name == "Gemini":
model = client
full_prompt = [
{'role': 'user', 'parts': [system_prompt]},
{'role': 'model', 'parts': ["Understood. I will act as this persona."]},
{'role': 'user', 'parts': [user_prompt]}
]
response = await model.generate_content_async(full_prompt)
# Capture Gemini Usage
if hasattr(response, "usage_metadata"):
usage["input"] = response.usage_metadata.prompt_token_count
usage["output"] = response.usage_metadata.candidates_token_count
return response.text, usage
elif client_name == "Anthropic":
response = await client.messages.create(
model=config.MODELS["Anthropic"]["default"],
max_tokens=8192,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}]
)
# Capture Anthropic Usage
if hasattr(response, "usage"):
usage["input"] = response.usage.input_tokens
usage["output"] = response.usage.output_tokens
return response.content[0].text, usage
elif client_name == "SambaNova":
completion = await client.chat.completions.create(
model=config.MODELS["SambaNova"]["default"],
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
# Capture SambaNova Usage
if hasattr(completion, "usage"):
usage["input"] = completion.usage.prompt_tokens
usage["output"] = completion.usage.completion_tokens
return completion.choices[0].message.content, usage
except Exception as e:
error_message = f"Error generating response from {client_name}: {str(e)}"
print(f"ERROR: API call to {client_name} failed: {e}")
return error_message, usage