Spaces:
Running
on
Zero
Running
on
Zero
| #!/usr/bin/env python3 | |
| """ | |
| Gemini MCP Server | |
| A Python-based MCP server that provides Gemini AI capabilities via Model Context Protocol. | |
| This server implements the generate_content tool for translation, summarization, document parsing, and transcription. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import base64 | |
| import asyncio | |
| import logging | |
| from typing import Any, Sequence | |
| from pathlib import Path | |
| # MCP imports | |
| try: | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| import mcp.server.stdio | |
| # Additional imports needed for server functionality | |
| from mcp import types as mcp_types | |
| from mcp.types import ImageContent, EmbeddedResource | |
| from mcp.server.models import InitializationOptions | |
| from mcp.server import NotificationOptions | |
| except ImportError: | |
| print("Error: MCP SDK not installed. Install with: pip install mcp", file=sys.stderr) | |
| sys.exit(1) | |
| # Gemini imports | |
| try: | |
| from google import genai | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| print("Error: google-genai not installed. Install with: pip install google-genai", file=sys.stderr) | |
| sys.exit(1) | |
| from supervisor import MAX_SEARCH_STRATEGIES | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Set MCP logging to WARNING to reduce noise | |
| mcp_logger = logging.getLogger("mcp") | |
| mcp_logger.setLevel(logging.WARNING) | |
| root_logger = logging.getLogger("root") | |
| root_logger.setLevel(logging.INFO) | |
| # Initialize Gemini | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| if not GEMINI_API_KEY: | |
| logger.error("GEMINI_API_KEY not set in environment variables") | |
| sys.exit(1) | |
| # Initialize Gemini client | |
| gemini_client = genai.Client(api_key=GEMINI_API_KEY) | |
| # Configuration from environment | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") | |
| GEMINI_MODEL_LITE = os.environ.get("GEMINI_MODEL_LITE", "gemini-2.5-flash-lite") | |
| GEMINI_TIMEOUT = int(os.environ.get("GEMINI_TIMEOUT", "300000")) # milliseconds | |
| GEMINI_MAX_OUTPUT_TOKENS = int(os.environ.get("GEMINI_MAX_OUTPUT_TOKENS", "8192")) | |
| GEMINI_MAX_FILES = int(os.environ.get("GEMINI_MAX_FILES", "10")) | |
| GEMINI_MAX_TOTAL_FILE_SIZE = int(os.environ.get("GEMINI_MAX_TOTAL_FILE_SIZE", "50")) # MB | |
| GEMINI_TEMPERATURE = float(os.environ.get("GEMINI_TEMPERATURE", "0.2")) | |
| # Initialize MCP server | |
| server = Server("server-mcp-agent") | |
| def decode_base64_file(content: str, mime_type: str = None) -> bytes: | |
| """Decode base64 encoded file content""" | |
| try: | |
| return base64.b64decode(content) | |
| except Exception as e: | |
| logger.error(f"Error decoding base64 content: {e}") | |
| raise | |
| def prepare_gemini_files(files: list) -> list: | |
| """Prepare files for Gemini API""" | |
| gemini_parts = [] | |
| for file_obj in files: | |
| try: | |
| # Handle file with path | |
| if "path" in file_obj: | |
| file_path = file_obj["path"] | |
| mime_type = file_obj.get("type") | |
| if not os.path.exists(file_path): | |
| logger.warning(f"File not found: {file_path}") | |
| continue | |
| # Read file | |
| with open(file_path, 'rb') as f: | |
| file_data = f.read() | |
| # Auto-detect MIME type if not provided | |
| if not mime_type: | |
| from mimetypes import guess_type | |
| mime_type, _ = guess_type(file_path) | |
| if not mime_type: | |
| mime_type = "application/octet-stream" | |
| # Handle file with base64 content | |
| elif "content" in file_obj: | |
| file_data = decode_base64_file(file_obj["content"]) | |
| mime_type = file_obj.get("type", "application/octet-stream") | |
| else: | |
| logger.warning("File object must have either 'path' or 'content'") | |
| continue | |
| # Add to Gemini parts | |
| gemini_parts.append({ | |
| "mime_type": mime_type, | |
| "data": file_data | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing file: {e}") | |
| continue | |
| return gemini_parts | |
| async def list_tools() -> list[Tool]: | |
| """List available tools""" | |
| try: | |
| tools = [ | |
| Tool( | |
| name="generate_content", | |
| description="Generate content using Gemini AI. Supports text generation, translation, summarization, document parsing, and audio transcription.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "user_prompt": { | |
| "type": "string", | |
| "description": "User prompt for generation (required)" | |
| }, | |
| "system_prompt": { | |
| "type": "string", | |
| "description": "System prompt to guide AI behavior (optional)" | |
| }, | |
| "files": { | |
| "type": "array", | |
| "description": "Array of files to include in generation (optional)", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "path": {"type": "string", "description": "Path to file"}, | |
| "content": {"type": "string", "description": "Base64 encoded file content"}, | |
| "type": {"type": "string", "description": "MIME type (auto-detected from file extension)"} | |
| } | |
| } | |
| }, | |
| "model": { | |
| "type": "string", | |
| "description": f"Gemini model to use (default: {GEMINI_MODEL})" | |
| }, | |
| "temperature": { | |
| "type": "number", | |
| "description": f"Temperature for generation 0-2 (default: {GEMINI_TEMPERATURE})" | |
| } | |
| }, | |
| "required": ["user_prompt"] | |
| } | |
| ), | |
| Tool( | |
| name="transcribe_audio", | |
| description="Transcribe audio file to text using Gemini AI. Supports various audio formats (WAV, MP3, M4A, etc.).", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "audio_path": { | |
| "type": "string", | |
| "description": "Path to audio file to transcribe (required)" | |
| }, | |
| "language": { | |
| "type": "string", | |
| "description": "Language code (optional, defaults to auto-detect)" | |
| } | |
| }, | |
| "required": ["audio_path"] | |
| } | |
| ), | |
| Tool( | |
| name="text_to_speech", | |
| description="Convert text to speech audio using Gemini AI. Returns path to generated audio file.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "text": { | |
| "type": "string", | |
| "description": "Text to convert to speech (required)" | |
| }, | |
| "language": { | |
| "type": "string", | |
| "description": "Language code (optional, defaults to 'en')" | |
| }, | |
| "voice": { | |
| "type": "string", | |
| "description": "Voice selection (optional)" | |
| } | |
| }, | |
| "required": ["text"] | |
| } | |
| ), | |
| Tool( | |
| name="search_web", | |
| description="Search the web for information. Returns search results with titles, URLs, and content snippets.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Search query string (required)" | |
| }, | |
| "max_results": { | |
| "type": "integer", | |
| "description": "Maximum number of results to return (optional, defaults to 5)", | |
| "default": MAX_SEARCH_STRATEGIES | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| ) | |
| ] | |
| return tools | |
| except Exception as e: | |
| logger.error(f"Error in list_tools(): {e}") | |
| raise | |
| async def call_tool(name: str, arguments: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: | |
| """Handle tool calls""" | |
| logger.info(f"🔵 MCP tool call received: {name}") | |
| if name == "generate_content": | |
| try: | |
| user_prompt = arguments.get("user_prompt") | |
| if not user_prompt: | |
| logger.error("❌ user_prompt is required but missing") | |
| return [TextContent(type="text", text="Error: user_prompt is required")] | |
| system_prompt = arguments.get("system_prompt") | |
| files = arguments.get("files", []) | |
| model = arguments.get("model", GEMINI_MODEL) | |
| temperature = float(arguments.get("temperature", GEMINI_TEMPERATURE)) | |
| # Prepare content for Gemini API | |
| # The API accepts contents as a string or list | |
| # For files, we need to handle them differently | |
| contents = user_prompt | |
| # If system prompt is provided, prepend it to the user prompt | |
| if system_prompt: | |
| contents = f"{system_prompt}\n\n{user_prompt}" | |
| # Prepare content for Gemini API | |
| # The google-genai API expects contents as a list of parts | |
| gemini_contents = [] | |
| # Add text content as first part | |
| gemini_contents.append(contents) | |
| # Add file content if provided | |
| if files: | |
| try: | |
| file_parts = prepare_gemini_files(files) | |
| # Convert file parts to the format expected by Gemini API | |
| for file_part in file_parts: | |
| # The API expects parts with inline_data for binary content | |
| gemini_contents.append({ | |
| "inline_data": { | |
| "mime_type": file_part["mime_type"], | |
| "data": base64.b64encode(file_part["data"]).decode('utf-8') | |
| } | |
| }) | |
| logger.info(f"Added {len(file_parts)} file(s) to Gemini request") | |
| except Exception as e: | |
| logger.warning(f"Error preparing files: {e}, continuing with text only") | |
| # Generate content using Gemini API | |
| try: | |
| # Prepare generation config | |
| generation_config = { | |
| "temperature": temperature, | |
| "max_output_tokens": GEMINI_MAX_OUTPUT_TOKENS | |
| } | |
| # Convert timeout from milliseconds to seconds | |
| # Cap at 18s to leave buffer for client timeout (25s) and communication overhead | |
| # This ensures server completes before client times out | |
| timeout_seconds = min(GEMINI_TIMEOUT / 1000.0, 18.0) | |
| logger.info(f"🔵 Calling Gemini API with model={model}, timeout={timeout_seconds}s...") | |
| # Use asyncio.to_thread to make the blocking call async | |
| # The API accepts contents as a list and config as a separate parameter | |
| def generate_sync(): | |
| try: | |
| logger.debug(f"Calling Gemini API synchronously (model={model})...") | |
| result = gemini_client.models.generate_content( | |
| model=model, | |
| contents=gemini_contents, | |
| config=generation_config, | |
| ) | |
| logger.debug("Gemini API synchronous call completed") | |
| return result | |
| except Exception as sync_error: | |
| logger.error(f"Error in synchronous Gemini API call: {type(sync_error).__name__}: {sync_error}") | |
| raise | |
| logger.debug(f"Starting async wrapper for Gemini API call (timeout={timeout_seconds}s)...") | |
| response = await asyncio.wait_for( | |
| asyncio.to_thread(generate_sync), | |
| timeout=timeout_seconds | |
| ) | |
| logger.info(f"✅ Gemini API call completed successfully") | |
| # Extract text from response | |
| if response and hasattr(response, 'text') and response.text: | |
| return [TextContent(type="text", text=response.text)] | |
| elif response and hasattr(response, 'candidates') and response.candidates: | |
| # Try to extract text from candidates if response is a list of candidates | |
| text_parts = [] | |
| for candidate in response.candidates: | |
| if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'): | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text'): | |
| text_parts.append(part.text) | |
| if text_parts: | |
| text = ''.join(text_parts) | |
| return [TextContent(type="text", text=text)] | |
| else: | |
| logger.warning("Gemini returned response but no text found") | |
| return [TextContent(type="text", text="Error: No text in Gemini response")] | |
| else: | |
| logger.warning("Gemini returned empty response") | |
| return [TextContent(type="text", text="Error: No response from Gemini")] | |
| except asyncio.TimeoutError: | |
| error_msg = f"Gemini API call timed out after {timeout_seconds}s" | |
| logger.error(f"❌ {error_msg}") | |
| logger.error(f" Model: {model}, Prompt length: {len(user_prompt)} chars") | |
| logger.error(f" This may indicate network issues, API rate limiting, or the request is too complex") | |
| return [TextContent(type="text", text=f"Error: {error_msg}. The request may be too complex or there may be network issues.")] | |
| except Exception as e: | |
| logger.error(f"❌ Error generating content: {type(e).__name__}: {e}") | |
| import traceback | |
| logger.debug(f"Full traceback: {traceback.format_exc()}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| except Exception as e: | |
| logger.error(f"Error in generate_content: {e}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| elif name == "transcribe_audio": | |
| try: | |
| audio_path = arguments.get("audio_path") | |
| if not audio_path: | |
| logger.error("❌ audio_path is required but missing") | |
| return [TextContent(type="text", text="Error: audio_path is required")] | |
| language = arguments.get("language", "auto") | |
| # Check if file exists | |
| if not os.path.exists(audio_path): | |
| logger.error(f"❌ Audio file not found: {audio_path}") | |
| return [TextContent(type="text", text=f"Error: Audio file not found: {audio_path}")] | |
| # Use Gemini to transcribe audio | |
| system_prompt = "You are a professional transcription service. Provide accurate, well-formatted transcripts." | |
| user_prompt = "Please transcribe this audio file. Include speaker identification if multiple speakers are present, and format it with proper punctuation and paragraphs, remove mumble, ignore non-verbal noises." | |
| files = [{"path": os.path.abspath(audio_path)}] | |
| try: | |
| generation_config = { | |
| "temperature": 0.2, | |
| "max_output_tokens": GEMINI_MAX_OUTPUT_TOKENS | |
| } | |
| timeout_seconds = min(GEMINI_TIMEOUT / 1000.0, 20.0) | |
| logger.info(f"🔵 Transcribing audio with Gemini API, timeout={timeout_seconds}s...") | |
| gemini_contents = [f"{system_prompt}\n\n{user_prompt}"] | |
| file_parts = prepare_gemini_files(files) | |
| for file_part in file_parts: | |
| gemini_contents.append({ | |
| "inline_data": { | |
| "mime_type": file_part["mime_type"], | |
| "data": base64.b64encode(file_part["data"]).decode('utf-8') | |
| } | |
| }) | |
| def transcribe_sync(): | |
| return gemini_client.models.generate_content( | |
| model=GEMINI_MODEL_LITE, | |
| contents=gemini_contents, | |
| config=generation_config, | |
| ) | |
| response = await asyncio.wait_for( | |
| asyncio.to_thread(transcribe_sync), | |
| timeout=timeout_seconds | |
| ) | |
| logger.info(f"✅ Audio transcription completed successfully") | |
| if response and hasattr(response, 'text') and response.text: | |
| return [TextContent(type="text", text=response.text.strip())] | |
| elif response and hasattr(response, 'candidates') and response.candidates: | |
| text_parts = [] | |
| for candidate in response.candidates: | |
| if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'): | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text'): | |
| text_parts.append(part.text) | |
| if text_parts: | |
| text = ''.join(text_parts).strip() | |
| return [TextContent(type="text", text=text)] | |
| else: | |
| return [TextContent(type="text", text="Error: No text in transcription response")] | |
| else: | |
| return [TextContent(type="text", text="Error: No response from transcription")] | |
| except asyncio.TimeoutError: | |
| error_msg = f"Audio transcription timed out" | |
| logger.error(f"❌ {error_msg}") | |
| return [TextContent(type="text", text=f"Error: {error_msg}")] | |
| except Exception as e: | |
| logger.error(f"❌ Error transcribing audio: {type(e).__name__}: {e}") | |
| import traceback | |
| logger.debug(f"Full traceback: {traceback.format_exc()}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| except Exception as e: | |
| logger.error(f"Error in transcribe_audio: {e}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| elif name == "text_to_speech": | |
| try: | |
| text = arguments.get("text") | |
| if not text: | |
| logger.error("❌ text is required but missing") | |
| return [TextContent(type="text", text="Error: text is required")] | |
| language = arguments.get("language", "en") | |
| # Note: Gemini API doesn't directly support TTS audio generation | |
| # This tool is provided for MCP protocol compliance, but the client | |
| # should use local TTS models (like maya1) for actual audio generation | |
| logger.info(f"🔵 TTS request received for text: {text[:50]}...") | |
| logger.info("ℹ️ Gemini API doesn't support direct TTS. Client should use local TTS model.") | |
| # Return a signal that client should handle TTS locally | |
| # The client will interpret this and use its local TTS model | |
| return [TextContent(type="text", text="USE_LOCAL_TTS")] | |
| except Exception as e: | |
| logger.error(f"Error in text_to_speech: {e}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| elif name == "search_web": | |
| try: | |
| query = arguments.get("query") | |
| if not query: | |
| logger.error("❌ query is required but missing") | |
| return [TextContent(type="text", text="Error: query is required")] | |
| max_results = int(arguments.get("max_results", MAX_SEARCH_STRATEGIES)) | |
| # Use DuckDuckGo for web search | |
| try: | |
| from ddgs import DDGS | |
| import requests | |
| from bs4 import BeautifulSoup | |
| except ImportError: | |
| logger.error("DuckDuckGo dependencies not available (ddgs, requests, beautifulsoup4)") | |
| return [TextContent(type="text", text="Error: Web search dependencies not available")] | |
| logger.info(f"🔵 Performing web search for: {query[:100]}...") | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| web_content = [] | |
| for result in results: | |
| try: | |
| url = result.get('href', '') | |
| title = result.get('title', '') | |
| snippet = result.get('body', '') | |
| try: | |
| response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'}) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk) | |
| if len(text) > 1000: | |
| text = text[:1000] + "..." | |
| web_content.append({ | |
| 'title': title, | |
| 'url': url, | |
| 'content': snippet + "\n" + text[:500] if text else snippet | |
| }) | |
| else: | |
| web_content.append({ | |
| 'title': title, | |
| 'url': url, | |
| 'content': snippet | |
| }) | |
| except: | |
| web_content.append({ | |
| 'title': title, | |
| 'url': url, | |
| 'content': snippet | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing search result: {e}") | |
| continue | |
| # Return results as JSON string | |
| results_json = json.dumps(web_content, indent=2) | |
| logger.info(f"✅ Web search completed: {len(web_content)} results") | |
| return [TextContent(type="text", text=results_json)] | |
| except Exception as e: | |
| logger.error(f"❌ Web search error: {type(e).__name__}: {e}") | |
| import traceback | |
| logger.debug(f"Full traceback: {traceback.format_exc()}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| except Exception as e: | |
| logger.error(f"Error in search_web: {e}") | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| else: | |
| return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
| async def main(): | |
| """Main entry point""" | |
| logger.info("=" * 60) | |
| logger.info("Starting Gemini MCP Server...") | |
| logger.info(f"Gemini API Key: {'Set' if GEMINI_API_KEY else 'Not Set'}") | |
| logger.info(f"Default Model: {GEMINI_MODEL}") | |
| logger.info(f"Default Lite Model: {GEMINI_MODEL_LITE}") | |
| logger.info("=" * 60) | |
| # Keep logging enabled for debugging | |
| original_root_level = logging.getLogger("root").level | |
| logging.getLogger("root").setLevel(logging.INFO) | |
| try: | |
| # Use stdio_server from mcp.server.stdio | |
| from mcp.server.stdio import stdio_server | |
| async with stdio_server() as streams: | |
| # Prepare server capabilities for initialization | |
| try: | |
| if hasattr(server, "get_capabilities"): | |
| notification_options = NotificationOptions() | |
| experimental_capabilities: dict[str, dict[str, Any]] = {} | |
| server_capabilities = server.get_capabilities( | |
| notification_options=notification_options, | |
| experimental_capabilities=experimental_capabilities, | |
| ) | |
| else: | |
| server_capabilities = mcp_types.ServerCapabilities() | |
| except Exception as cap_error: | |
| logger.warning(f"Failed to gather server capabilities: {cap_error}") | |
| server_capabilities = mcp_types.ServerCapabilities() | |
| init_options = InitializationOptions( | |
| server_name="gemini-mcp-server", | |
| server_version="1.0.0", | |
| capabilities=server_capabilities, | |
| ) | |
| logger.info("MCP server ready") | |
| try: | |
| # Run the server - it will automatically handle the initialization handshake | |
| await server.run( | |
| read_stream=streams[0], | |
| write_stream=streams[1], | |
| initialization_options=init_options, | |
| ) | |
| except Exception as run_error: | |
| logger.error(f"Error in server.run(): {run_error}") | |
| raise | |
| except Exception as e: | |
| logging.getLogger("root").setLevel(original_root_level) | |
| logger.error(f"MCP server fatal error: {type(e).__name__}: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |