Spaces:
Running
Running
File size: 11,173 Bytes
414c707 fcc06d1 38badd9 62f8897 85f50db 62f8897 90a835a 7daf486 38badd9 3ebfea0 fcc06d1 407dc12 6e30edd 407dc12 6e30edd b72b21b 6a017c5 b72b21b 6a017c5 407dc12 b72b21b 85f50db 407dc12 85f50db a4f27db 38badd9 6e30edd 407dc12 6e30edd a4f27db b72b21b 38badd9 85f50db 6e30edd 38badd9 6e30edd 38badd9 a4f27db b72b21b 407dc12 a4f27db 38badd9 b72b21b 407dc12 38badd9 fcc06d1 85f50db fcc06d1 85f50db fcc06d1 a4f27db 85f50db 7f6ef0d a4f27db 407dc12 6e30edd 3ebfea0 85f50db 90a835a 3ebfea0 b90a3a7 85f50db a4f27db 85f50db 407dc12 38badd9 85f50db fcc06d1 38badd9 a4f27db 6e30edd 38badd9 3ebfea0 38badd9 3ebfea0 38badd9 90a835a 7f6ef0d 38badd9 e11da18 a4f27db e11da18 2f0d926 a4f27db e11da18 38badd9 fcc06d1 3ebfea0 fcc06d1 38badd9 a4f27db 38badd9 85f50db a4f27db 85f50db 38badd9 a4f27db 407dc12 a4f27db 85f50db 414c707 a4f27db 85f50db 3ebfea0 407dc12 62f8897 a4f27db 85f50db 407dc12 85f50db a4f27db 85f50db 414c707 407dc12 414c707 407dc12 414c707 85f50db 414c707 85f50db a4f27db 85f50db 3ebfea0 85f50db a4f27db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# mcp_servers.py (FIXED: Added OpenAI & Nebius Support to get_llm_response)
import asyncio
import json
import re
import google.generativeai as genai
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
from typing import Dict, Optional, Tuple, List, Any
import config
from utils import load_prompt
from personas import PERSONAS_DATA
EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"])
# Schema definition
EVALUATION_SCHEMA = {
"type": "OBJECT",
"properties": {
"Novelty": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Usefulness_Feasibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Flexibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Elaboration": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Cultural_Appropriateness": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}
},
"required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"]
}
def extract_json(text: str) -> dict:
try:
clean_text = text.strip()
if "```json" in clean_text:
clean_text = clean_text.split("```json")[1].split("```")[0].strip()
elif "```" in clean_text:
clean_text = clean_text.split("```")[1].split("```")[0].strip()
return json.loads(clean_text)
except (json.JSONDecodeError, IndexError):
try:
match = re.search(r'(\{[\s\S]*\})', text)
if match: return json.loads(match.group(1))
except: pass
raise ValueError(f"Could not extract JSON from response: {text[:100]}...")
class BusinessSolutionEvaluator:
def __init__(self, gemini_client: Optional[genai.GenerativeModel]):
if not gemini_client: raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.")
self.gemini_model = gemini_client
async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]:
print(f"Evaluating solution (live): {solution_text[:50]}...")
base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text)
schema_instruction = """
[IMPORTANT SYSTEM INSTRUCTION]
Ignore any previous examples of JSON formatting in this prompt.
You MUST strictly follow the Output Schema provided below.
For EACH of the 5 metrics, you must provide an object with TWO fields: "score" (integer) and "justification" (string).
Do not output a list. Return a single JSON object.
"""
final_prompt = base_prompt + schema_instruction
usage = {"model": "Gemini", "input": 0, "output": 0}
try:
response = await self.gemini_model.generate_content_async(
final_prompt,
generation_config=genai.types.GenerationConfig(
response_mime_type="application/json",
response_schema=EVALUATION_SCHEMA
)
)
if hasattr(response, "usage_metadata"):
usage["input"] = response.usage_metadata.prompt_token_count
usage["output"] = response.usage_metadata.candidates_token_count
v_fitness = extract_json(response.text)
if not isinstance(v_fitness, (dict, list)): raise ValueError(f"Judge returned invalid type: {type(v_fitness)}")
return v_fitness, usage
except Exception as e:
print(f"ERROR: BusinessSolutionEvaluator failed: {e}")
return {"Novelty": {"score": 1, "justification": f"Error: {str(e)}"}}, usage
class AgentCalibrator:
def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator):
self.evaluator = evaluator
self.api_clients = {name: client for name, client in api_clients.items() if client}
self.sponsor_llms = list(self.api_clients.keys())
print(f"AgentCalibrator initialized with enabled clients: {self.sponsor_llms}")
async def calibrate_team(self, problem: str) -> Tuple[Dict[str, Any], List[str], List[Dict[str, Any]], List[Dict[str, Any]]]:
print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...")
error_log = []
detailed_results = []
all_usage_stats = []
if not self.sponsor_llms:
raise Exception("AgentCalibrator cannot run: No LLM clients are configured.")
if len(self.sponsor_llms) == 1:
default_llm = self.sponsor_llms[0]
print("Only one LLM available. Skipping calibration.")
plan = {
"Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": default_llm},
"Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm},
"Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm}
}
return plan, error_log, [], []
roles_to_test = {role: PERSONAS_DATA[key]["description"] for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()}
test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution."
tasks = []
for role, persona in roles_to_test.items():
for llm_name in self.sponsor_llms:
tasks.append(self.run_calibration_test(problem, role, llm_name, persona, test_problem))
results = await asyncio.gather(*tasks)
detailed_results = results
for res in results:
if "usage_gen" in res: all_usage_stats.append(res["usage_gen"])
if "usage_eval" in res: all_usage_stats.append(res["usage_eval"])
best_llms = {}
role_metrics = config.CALIBRATION_CONFIG["role_metrics"]
for role in roles_to_test.keys():
best_score = -1
best_llm = self.sponsor_llms[0]
for res in results:
if res["role"] == role:
if res.get("error"):
error_log.append(f"Calibration failed for {res['llm']} (as {role}): {res['error']}")
continue
metric = role_metrics[role]
raw_score_data = res.get("score", {})
if not isinstance(raw_score_data, (dict, list)): raw_score_data = {}
if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {}
metric_data = raw_score_data.get(metric, {})
if not isinstance(metric_data, (dict, list)): metric_data = {}
if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {}
score = metric_data.get("score", 0)
if score > best_score:
best_score = score
best_llm = res["llm"]
best_llms[role] = best_llm
team_plan = {
"Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": best_llms["Plant"]},
"Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": best_llms["Implementer"]},
"Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]}
}
print(f"Calibration complete (live). Team plan: {team_plan}")
return team_plan, error_log, detailed_results, all_usage_stats
async def run_calibration_test(self, problem, role, llm_name, persona, test_problem):
client = self.api_clients[llm_name]
solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem)
if "Error generating response" in solution:
return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage}
score, eval_usage = await self.evaluator.evaluate(problem, solution)
return {
"role": role, "llm": llm_name, "score": score, "output": solution, "usage_gen": gen_usage, "usage_eval": eval_usage
}
# --- UPDATED: Handles OpenAI and Nebius ---
async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]:
"""Returns (text_response, usage_dict)"""
usage = {"model": client_name, "input": 0, "output": 0}
try:
if client_name == "Gemini":
model = client
full_prompt = [{'role': 'user', 'parts': [system_prompt]}, {'role': 'model', 'parts': ["Understood."]}, {'role': 'user', 'parts': [user_prompt]}]
response = await model.generate_content_async(full_prompt)
if hasattr(response, "usage_metadata"):
usage["input"] = response.usage_metadata.prompt_token_count
usage["output"] = response.usage_metadata.candidates_token_count
return response.text, usage
elif client_name == "Anthropic":
response = await client.messages.create(
model=config.MODELS["Anthropic"]["default"], max_tokens=8192, system=system_prompt, messages=[{"role": "user", "content": user_prompt}]
)
if hasattr(response, "usage"):
usage["input"] = response.usage.input_tokens
usage["output"] = response.usage.output_tokens
return response.content[0].text, usage
# --- THIS IS THE PART THAT WAS MISSING OR INCOMPLETE ---
elif client_name in ["SambaNova", "OpenAI", "Nebius"]:
# Dynamically get the correct model ID from config.py
model_id = config.MODELS.get(client_name, {}).get("default", "gpt-4o-mini")
completion = await client.chat.completions.create(
model=model_id,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
if hasattr(completion, "usage"):
usage["input"] = completion.usage.prompt_tokens
usage["output"] = completion.usage.completion_tokens
return completion.choices[0].message.content, usage
except Exception as e:
error_message = f"Error generating response from {client_name}: {str(e)}"
print(f"ERROR: API call to {client_name} failed: {e}")
return error_message, usage |