Spaces:
Running
Running
| """ | |
| LangGraph Agent for Vibe Reader | |
| Implements the agentic workflow for book recommendation based on visual vibes | |
| """ | |
| import os | |
| import json | |
| from typing import TypedDict, List, Dict, Any, Literal, Annotated | |
| from operator import add | |
| from openai import OpenAI | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.types import interrupt | |
| from dotenv import load_dotenv | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| NEBIUS_API_KEY = os.getenv("NEBIUS_API_KEY") | |
| NEBIUS_BASE_URL = "https://api.tokenfactory.nebius.com/v1/" | |
| VLM_MODEL = "google/gemma-3-27b-it-fast" | |
| REASONING_MODEL = "Qwen/Qwen3-30B-A3B-Thinking-2507" | |
| FAST_MODEL = "moonshotai/Kimi-K2-Instruct" # Non-thinking model for simple tasks | |
| MODAL_VECTOR_STORE_URL = os.getenv("MODAL_VECTOR_STORE_URL", "https://placeholder-modal-url.modal.run/search") | |
| GOOGLE_BOOKS_MCP_URL = os.getenv("GOOGLE_BOOKS_MCP_URL", "https://mcp-1st-birthday-google-books-mcp.hf.space") | |
| NUM_BOOKS_TO_RETRIEVE = 7 # Target number of books with valid descriptions | |
| NUM_BOOKS_TO_FETCH = 13 # Fetch extra to account for books without descriptions | |
| NUM_FINAL_BOOKS = 3 | |
| # ============================================================================ | |
| # STATE DEFINITION | |
| # ============================================================================ | |
| class AgentState(TypedDict): | |
| """State maintained throughout the agent workflow""" | |
| # User inputs | |
| images: List[str] # List of image URLs or base64 encoded images | |
| # Conversation history (no reducer - we manage the list directly) | |
| messages: List[Dict[str, str]] | |
| # Vibe components (from JSON extraction) | |
| aesthetic_genre_keywords: List[str] # Genre/aesthetic keywords | |
| mood_atmosphere: List[str] # Mood descriptors | |
| core_themes: List[str] # Core themes | |
| tropes: List[str] # Story tropes | |
| feels_like: str # User-facing "feels like" description (what gets refined) | |
| vibe_refinement_count: int # Number of refinement iterations | |
| # Book retrieval | |
| retrieved_books: List[Dict[str, str]] # List of {title, author} dicts | |
| books_with_metadata: List[Dict[str, Any]] # Enriched with Google Books data | |
| # Narrowing process | |
| q1_question: str # First narrowing question (stored for resume) | |
| q2_question: str # Second narrowing question (stored for resume) | |
| user_preferences: Dict[str, Any] # Accumulated user preferences from Q&A (question + answer pairs) | |
| final_books: List[Dict[str, Any]] # Final 3 books | |
| # Final outputs | |
| soundtrack_url: str # ElevenLabs generated soundtrack | |
| # Debug/reasoning (no reducer - we manage the list directly) | |
| reasoning: List[str] | |
| # ============================================================================ | |
| # HELPER FUNCTIONS | |
| # ============================================================================ | |
| def create_openai_client() -> OpenAI: | |
| """Create OpenAI client configured for Nebius""" | |
| return OpenAI(api_key=NEBIUS_API_KEY, base_url=NEBIUS_BASE_URL) | |
| def call_llm(messages: List[Dict[str, Any]], temperature: float = 0.7, model: str = REASONING_MODEL, include_reasoning: bool = False, max_tokens: int = 2500): | |
| """Generic LLM call for reasoning and decision-making using Nebius API | |
| Args: | |
| messages: Conversation messages | |
| temperature: Sampling temperature | |
| model: Model to use | |
| include_reasoning: If True, returns tuple of (content, reasoning_text) | |
| max_tokens: Maximum tokens for response (default 1000) | |
| Returns: | |
| str or tuple: Response content, or (content, reasoning) if include_reasoning=True | |
| """ | |
| client = create_openai_client() # Uses Nebius | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens | |
| ) | |
| message = response.choices[0].message | |
| content = message.content or "" | |
| if include_reasoning: | |
| # Nebius API returns reasoning in a separate field for Thinking models | |
| reasoning = getattr(message, 'reasoning_content', None) or "" | |
| if reasoning: | |
| # If content is empty, log a warning but don't try to extract from reasoning | |
| # (the last line of reasoning is usually garbage, not the answer) | |
| if not content.strip(): | |
| print(f"[DEBUG AGENT] Warning: LLM returned empty content with reasoning. This may indicate an issue.") | |
| return content, reasoning | |
| # Fallback: try parsing <think>...</think> tags from content | |
| import re | |
| think_match = re.match(r'<think>(.*?)</think>(.*)', content, re.DOTALL) | |
| if think_match: | |
| reasoning = think_match.group(1).strip() | |
| final_content = think_match.group(2).strip() | |
| return final_content, reasoning | |
| # No reasoning found | |
| return content, "No reasoning trace found" | |
| return content | |
| # ============================================================================ | |
| # NODES | |
| # ============================================================================ | |
| def generate_initial_vibe(state: AgentState) -> AgentState: | |
| """Node: Generate initial vibe description from uploaded images using VLM""" | |
| from prompts import VIBE_EXTRACTION | |
| from utils import parse_json_response, extract_vibe_components | |
| client = create_openai_client() | |
| # Construct message with images | |
| content = [{"type": "text", "text": "Analyze these images and extract the vibe:"}] | |
| for img in state["images"]: | |
| # Convert local file paths to base64 data URLs if needed | |
| if img.startswith(('http://', 'https://', 'data:')): | |
| # Already a valid URL | |
| image_url = img | |
| else: | |
| # Local file path - convert to base64 | |
| import base64 | |
| from pathlib import Path | |
| img_path = Path(img) | |
| if img_path.exists(): | |
| with open(img_path, 'rb') as f: | |
| img_data = base64.b64encode(f.read()).decode('utf-8') | |
| # Determine MIME type from extension | |
| ext = img_path.suffix.lower() | |
| mime_types = {'.jpg': 'jpeg', '.jpeg': 'jpeg', '.png': 'png', '.gif': 'gif', '.webp': 'webp'} | |
| mime = mime_types.get(ext, 'jpeg') | |
| image_url = f"data:image/{mime};base64,{img_data}" | |
| else: | |
| state["reasoning"].append(f"⚠️ Warning: Image file not found: {img}") | |
| continue | |
| content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": image_url} | |
| }) | |
| response = client.chat.completions.create( | |
| model=VLM_MODEL, | |
| messages=[ | |
| {"role": "system", "content": VIBE_EXTRACTION}, | |
| {"role": "user", "content": content} | |
| ], | |
| temperature=0.7, | |
| max_tokens=2000 | |
| ) | |
| vibe_json_str = response.choices[0].message.content | |
| # Parse JSON response | |
| vibe_json = parse_json_response(vibe_json_str) | |
| if not vibe_json: | |
| state["reasoning"].append(f"❌ Failed to parse vibe JSON. Raw response: {vibe_json_str[:200]}") | |
| # Fallback to simple extraction | |
| state["feels_like"] = vibe_json_str | |
| state["aesthetic_genre_keywords"] = [] | |
| state["mood_atmosphere"] = [] | |
| state["core_themes"] = [] | |
| state["tropes"] = [] | |
| else: | |
| # Extract components | |
| components = extract_vibe_components(vibe_json) | |
| state["aesthetic_genre_keywords"] = components["aesthetic_genre_keywords"] | |
| state["mood_atmosphere"] = components["mood_atmosphere"] | |
| state["core_themes"] = components["core_themes"] | |
| state["tropes"] = components["tropes"] | |
| state["feels_like"] = components["feels_like"] | |
| state["reasoning"].append(f"✅ Extracted vibe components:\n" | |
| f" - Aesthetics: {', '.join(state['aesthetic_genre_keywords'])}\n" | |
| f" - Mood: {', '.join(state['mood_atmosphere'])}\n" | |
| f" - Themes: {', '.join(state['core_themes'])}\n" | |
| f" - Tropes: {', '.join(state['tropes'])}") | |
| state["vibe_refinement_count"] = 0 | |
| # Only show feels_like to user | |
| assistant_message = f"Here's the vibe I'm getting from your images:\n\n{state['feels_like']}\n\nDoes this capture what you're looking for, or would you like me to adjust it?" | |
| state["messages"].append({ | |
| "role": "assistant", | |
| "content": assistant_message | |
| }) | |
| # Wait for user feedback; when resumed, user_response will contain their reply | |
| user_response = interrupt(assistant_message) | |
| if user_response: | |
| state["messages"].append({"role": "user", "content": user_response}) | |
| return state | |
| def refine_vibe(state: AgentState) -> AgentState: | |
| """Node: Refine vibe based on user feedback - only refines feels_like portion""" | |
| from prompts import VIBE_REFINEMENT | |
| from utils import strip_thinking_tags | |
| print("[DEBUG AGENT] refine_vibe node started") | |
| # Get the latest user message (feedback) | |
| user_messages = [m for m in state["messages"] if m.get("role") == "user"] | |
| print(f"[DEBUG AGENT] Found {len(user_messages)} user messages") | |
| if not user_messages: | |
| state["reasoning"].append("⚠️ No user feedback found for refinement; skipping refine_vibe step") | |
| return state | |
| user_feedback = user_messages[-1]["content"] | |
| print(f"[DEBUG AGENT] user_feedback: {user_feedback[:50] if user_feedback else 'None'}...") | |
| # Use LLM to refine only the feels_like description | |
| # Keep other vibe components (aesthetics, themes, tropes) unchanged | |
| messages = [ | |
| {"role": "system", "content": VIBE_REFINEMENT}, | |
| {"role": "user", "content": f"Current 'feels like' description: {state['feels_like']}\n\nUser feedback: {user_feedback}\n\nProvide the refined 'feels like' description (4-5 sentences):"} | |
| ] | |
| print(f"[DEBUG AGENT] Calling LLM for refinement...") | |
| refined_feels_like, reasoning = call_llm(messages, temperature=0.7, include_reasoning=True) | |
| print(f"[DEBUG AGENT] LLM returned content: {refined_feels_like[:200] if refined_feels_like else 'None'}...") | |
| print(f"[DEBUG AGENT] LLM reasoning: {reasoning[:200] if reasoning else 'None'}...") | |
| # Ensure no thinking tags leak into the feels_like | |
| refined_feels_like = strip_thinking_tags(refined_feels_like) | |
| # Update only the feels_like portion | |
| state["feels_like"] = refined_feels_like | |
| state["vibe_refinement_count"] += 1 | |
| assistant_message = f"I've refined the vibe:\n\n{refined_feels_like}\n\nIs this better, or would you like further adjustments?" | |
| print(f"[DEBUG AGENT] Adding assistant message to state, current msg count: {len(state['messages'])}") | |
| state["messages"].append({ | |
| "role": "assistant", | |
| "content": assistant_message | |
| }) | |
| state["reasoning"].append(f"🧠 REASONING (Vibe Refinement #{state['vibe_refinement_count']}):\n{reasoning}\n") | |
| print(f"[DEBUG AGENT] After append, msg count: {len(state['messages'])}") | |
| # Wait for user feedback on the refined vibe | |
| print(f"[DEBUG AGENT] About to call interrupt()") | |
| user_response = interrupt(assistant_message) | |
| print(f"[DEBUG AGENT] interrupt() returned: {user_response}") | |
| if user_response: | |
| state["messages"].append({"role": "user", "content": user_response}) | |
| return state | |
| def check_vibe_satisfaction(state: AgentState) -> Literal["refine", "retrieve"]: | |
| """Conditional edge: Check if user is satisfied with vibe description""" | |
| from prompts import VIBE_SATISFACTION_CHECKER | |
| # Get the last user message | |
| user_messages = [m for m in state["messages"] if m.get("role") == "user"] | |
| if not user_messages: | |
| # No explicit feedback; default to moving forward | |
| return "retrieve" | |
| raw_content = user_messages[-1]["content"] | |
| # Content may occasionally be a non-string (e.g., list from upstream tools); | |
| # normalize to text before passing into the LLM. | |
| if isinstance(raw_content, str): | |
| last_user_msg = raw_content | |
| elif isinstance(raw_content, list): | |
| # Join any text-like chunks into a single string representation | |
| last_user_msg = " ".join(str(x) for x in raw_content) | |
| else: | |
| last_user_msg = str(raw_content) | |
| # Use LLM to determine satisfaction | |
| messages = [ | |
| {"role": "system", "content": VIBE_SATISFACTION_CHECKER}, | |
| {"role": "user", "content": f"User's response: {last_user_msg}"} | |
| ] | |
| decision, reasoning = call_llm(messages, temperature=0.0, include_reasoning=True) | |
| decision = decision.strip().lower() if decision else "" | |
| print(f"[DEBUG] check_vibe_satisfaction - user said: '{last_user_msg}'") | |
| print(f"[DEBUG] check_vibe_satisfaction - LLM decision: '{decision}'") | |
| state["reasoning"].append(f"🧠 REASONING (Satisfaction Check):\n{reasoning}\n\n→ Decision: {decision}") | |
| if "satisfied" in decision and "not_satisfied" not in decision: | |
| print(f"[DEBUG] check_vibe_satisfaction -> RETRIEVE (user satisfied)") | |
| return "retrieve" | |
| else: | |
| print(f"[DEBUG] check_vibe_satisfaction -> REFINE (user not satisfied)") | |
| return "refine" | |
| def retrieve_books(state: AgentState) -> AgentState: | |
| """Node: Retrieve books from Modal vector store""" | |
| import requests | |
| # Construct full vibe query from all components | |
| vibe_query = f"{state['feels_like']}\n\nGenres/Aesthetics: {', '.join(state['aesthetic_genre_keywords'])}\nMood: {', '.join(state['mood_atmosphere'])}\nThemes: {', '.join(state['core_themes'])}\nTropes: {', '.join(state['tropes'])}" | |
| try: | |
| # Call Modal vector store endpoint | |
| print(f"DEBUG: Calling Modal URL: {MODAL_VECTOR_STORE_URL}") | |
| state["reasoning"].append(f"📚 Calling Modal vector store with full vibe profile") | |
| state["reasoning"].append(f"URL: {MODAL_VECTOR_STORE_URL}") | |
| response = requests.post( | |
| MODAL_VECTOR_STORE_URL, | |
| json={ | |
| "query": vibe_query, | |
| "top_k": NUM_BOOKS_TO_RETRIEVE, | |
| "min_books_per_vibe": 1 | |
| }, | |
| timeout=180 # Long timeout for cold start | |
| ) | |
| print(f"DEBUG: Response status: {response.status_code}") | |
| print(f"DEBUG: Response text: {response.text[:500] if response.text else 'empty'}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Extract books from search results with diversity across vibes | |
| # Modal returns: {"results": [{"books": [...], "vibe_data": {...}, "score": ...}], ...} | |
| # Strategy: Take up to MAX_BOOKS_PER_VIBE from each vibe to ensure diversity | |
| MAX_BOOKS_PER_VIBE = 5 | |
| books = [] | |
| seen = set() # Track seen books for deduplication | |
| for result in data.get("results", []): | |
| vibe_score = result.get("score", 0) | |
| vibe_books = result.get("books", []) | |
| books_from_this_vibe = 0 | |
| for book in vibe_books: | |
| if books_from_this_vibe >= MAX_BOOKS_PER_VIBE: | |
| break | |
| title = book.get("title", "") | |
| author = book.get("author", "") | |
| key = (title.lower(), author.lower()) | |
| # Skip duplicates | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| books.append({ | |
| "title": title, | |
| "author": author, | |
| "vibe_score": vibe_score # Track which vibe it came from | |
| }) | |
| books_from_this_vibe += 1 | |
| # Fetch extra books to account for filtering (books without descriptions) | |
| books = books[:NUM_BOOKS_TO_FETCH] | |
| state["reasoning"].append(f"Retrieved {len(books)} books from {len(data.get('results', []))} vibes (max {MAX_BOOKS_PER_VIBE} per vibe)") | |
| else: | |
| raise Exception(f"HTTP {response.status_code}: {response.text[:200]}") | |
| except Exception as e: | |
| # Fallback to mock data for development | |
| print(f"DEBUG ERROR: Vector store call failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| state["reasoning"].append(f"Vector store call failed: {e}. Using mock data.") | |
| books = [ | |
| {"title": "The Night Circus", "author": "Erin Morgenstern"}, | |
| {"title": "The Ocean at the End of the Lane", "author": "Neil Gaiman"}, | |
| {"title": "The Starless Sea", "author": "Erin Morgenstern"}, | |
| {"title": "Piranesi", "author": "Susanna Clarke"}, | |
| {"title": "The House in the Cerulean Sea", "author": "TJ Klune"}, | |
| {"title": "Howl's Moving Castle", "author": "Diana Wynne Jones"}, | |
| {"title": "Circe", "author": "Madeline Miller"}, | |
| {"title": "The Invisible Life of Addie LaRue", "author": "V.E. Schwab"}, | |
| {"title": "Mexican Gothic", "author": "Silvia Moreno-Garcia"}, | |
| {"title": "The Ten Thousand Doors of January", "author": "Alix E. Harrow"}, | |
| {"title": "The Goblin Emperor", "author": "Katherine Addison"}, | |
| {"title": "The Priory of the Orange Tree", "author": "Samantha Shannon"}, | |
| {"title": "Uprooted", "author": "Naomi Novik"}, | |
| {"title": "The Bear and the Nightingale", "author": "Katherine Arden"}, | |
| {"title": "The City of Brass", "author": "S.A. Chakraborty"} | |
| ] | |
| state["retrieved_books"] = books | |
| state["reasoning"].append(f"Total books in state: {len(books)}") | |
| return state | |
| def call_google_books_mcp(title: str, author: str = "") -> Dict[str, Any]: | |
| """ | |
| Call the Google Books MCP server via Gradio MCP endpoint. | |
| Args: | |
| title: Book title | |
| author: Book author (optional) | |
| Returns: | |
| Book metadata dict or None if not found | |
| """ | |
| import requests | |
| try: | |
| # Gradio MCP endpoint (Streamable HTTP transport) | |
| mcp_url = f"{GOOGLE_BOOKS_MCP_URL}/gradio_api/mcp/" | |
| # MCP uses JSON-RPC style calls | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "method": "tools/call", | |
| "params": { | |
| "name": "google_books_mcp_search_book_by_title_author", | |
| "arguments": { | |
| "title": title, | |
| "author": author | |
| } | |
| }, | |
| "id": 1 | |
| } | |
| response = requests.post( | |
| mcp_url, | |
| json=payload, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Accept": "application/json, text/event-stream" | |
| }, | |
| timeout=30 | |
| ) | |
| if response.status_code != 200: | |
| print(f"[DEBUG] Google Books MCP failed: {response.status_code} - {response.text[:200]}") | |
| return None | |
| # Parse SSE response | |
| for line in response.text.split('\n'): | |
| if line.startswith('data: '): | |
| try: | |
| data = json.loads(line[6:]) | |
| if "result" in data: | |
| result = data["result"] | |
| if isinstance(result, dict): | |
| # Check if it's a direct book response | |
| if "success" in result and "book" in result: | |
| if result.get("success") and result.get("book"): | |
| return result["book"] | |
| # Check if it's a content array response | |
| elif "content" in result: | |
| for content_item in result["content"]: | |
| if content_item.get("type") == "text": | |
| text_content = content_item.get("text", "") | |
| if text_content.strip(): | |
| try: | |
| book_data = json.loads(text_content) | |
| if book_data.get("success") and book_data.get("found"): | |
| return book_data.get("book") | |
| except json.JSONDecodeError: | |
| continue | |
| return result | |
| except json.JSONDecodeError: | |
| continue | |
| return None | |
| except Exception as e: | |
| print(f"[DEBUG] Google Books MCP error: {e}") | |
| return None | |
| def fetch_book_metadata(state: AgentState) -> AgentState: | |
| """Node: Fetch metadata for retrieved books via Google Books API""" | |
| print(f"[DEBUG AGENT] fetch_book_metadata node started with {len(state.get('retrieved_books', []))} books") | |
| enriched_books = [] | |
| skipped_books = [] | |
| state["reasoning"].append(f"📖 Fetching metadata from Google Books (need {NUM_BOOKS_TO_RETRIEVE} with descriptions)...") | |
| for book in state["retrieved_books"]: | |
| # Stop once we have enough books with valid descriptions | |
| if len(enriched_books) >= NUM_BOOKS_TO_RETRIEVE: | |
| print(f"[DEBUG] Reached target of {NUM_BOOKS_TO_RETRIEVE} books, stopping") | |
| break | |
| try: | |
| # Use Google Books MCP server | |
| metadata = call_google_books_mcp(book['title'], book['author']) | |
| if metadata and metadata.get("title"): | |
| description = metadata.get("description", "") | |
| # FILTER: Skip books without meaningful descriptions | |
| if not description or len(description.strip()) < 50: | |
| skipped_books.append(book['title']) | |
| print(f"[DEBUG] Skipping '{book['title']}' - no/short description ({len(description.strip()) if description else 0} chars)") | |
| continue | |
| # Format authors as string | |
| authors = metadata.get("authors", []) | |
| author_str = ", ".join(authors) if isinstance(authors, list) else authors or book["author"] | |
| enriched_books.append({ | |
| "title": metadata.get("title", book["title"]), | |
| "author": author_str, | |
| "description": description, | |
| "cover_url": metadata.get("thumbnail"), | |
| "isbn": metadata.get("isbn"), | |
| "published_year": metadata.get("published_date", "")[:4] if metadata.get("published_date") else None, | |
| "page_count": metadata.get("page_count"), | |
| "categories": metadata.get("categories", []), | |
| "preview_link": metadata.get("preview_link"), | |
| "info_link": metadata.get("info_link") | |
| }) | |
| print(f"[DEBUG] Found metadata for: {book['title']} ({len(description)} chars) [{len(enriched_books)}/{NUM_BOOKS_TO_RETRIEVE}]") | |
| else: | |
| # No results found - skip | |
| skipped_books.append(book['title']) | |
| print(f"[DEBUG] Skipping '{book['title']}' - no Google Books results") | |
| except Exception as e: | |
| # On any error, skip the book | |
| skipped_books.append(book['title']) | |
| state["reasoning"].append(f"Error fetching metadata for '{book['title']}': {str(e)}") | |
| state["books_with_metadata"] = enriched_books | |
| if skipped_books: | |
| state["reasoning"].append(f"⚠️ Skipped {len(skipped_books)} books without descriptions") | |
| state["reasoning"].append(f"✅ Found {len(enriched_books)}/{NUM_BOOKS_TO_RETRIEVE} books with full metadata") | |
| return state | |
| def _generate_narrowing_question(state: AgentState, question_num: int) -> tuple: | |
| """Helper: Generate a narrowing question""" | |
| from prompts import NARROWING_QUESTION_GENERATOR | |
| books_summary_parts = [] | |
| for i, b in enumerate(state["books_with_metadata"], 1): | |
| desc = b.get('description', 'No description') | |
| cats = ', '.join(b.get('categories', [])) if b.get('categories') else 'Uncategorized' | |
| books_summary_parts.append(f"Book {i}: {b['title']} by {b['author']}\n Categories: {cats}\n Description: {desc}") | |
| books_summary = "\n\n".join(books_summary_parts) | |
| vibe_context = f"Feels like: {state['feels_like']}\nAesthetics: {', '.join(state['aesthetic_genre_keywords'])}\nMood: {', '.join(state['mood_atmosphere'])}\nThemes: {', '.join(state['core_themes'])}" | |
| is_last = question_num >= 2 | |
| question_context = f"This is question {question_num} of 2." + (" THIS IS THE LAST QUESTION - make it count!" if is_last else "") | |
| user_prompt = f"Books to narrow down:\n{books_summary}\n\nVibe:\n{vibe_context}\n\nPrevious preferences: {json.dumps(state.get('user_preferences', {}), indent=2)}\n\n{question_context}\n\nGenerate an either/or question:" | |
| messages = [ | |
| {"role": "system", "content": NARROWING_QUESTION_GENERATOR}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| return call_llm(messages, temperature=0.8, model=FAST_MODEL, include_reasoning=True) | |
| def generate_question_1(state: AgentState) -> AgentState: | |
| """Node: Generate Q1 and add to messages""" | |
| print(f"[DEBUG AGENT] generate_question_1") | |
| question, reasoning = _generate_narrowing_question(state, 1) | |
| state["narrowing_questions_asked"] = 1 | |
| state["q1_question"] = question | |
| state["reasoning"].append(f"🧠 REASONING (Narrowing Question #1):\n{reasoning}\n\n→ Question: {question}") | |
| assistant_message = f"To help me find the perfect match:\n\n{question}" | |
| print(f"[DEBUG AGENT] Q1: {question[:60]}...") | |
| state["messages"].append({"role": "assistant", "content": assistant_message}) | |
| return state | |
| def wait_for_answer_1(state: AgentState) -> AgentState: | |
| """Node: Wait for user's answer to Q1""" | |
| print(f"[DEBUG AGENT] wait_for_answer_1") | |
| user_answer = interrupt("Waiting for Q1 answer") | |
| if user_answer: | |
| state["messages"].append({"role": "user", "content": user_answer}) | |
| state["user_preferences"]["q1"] = { | |
| "question": state.get("q1_question", ""), | |
| "answer": user_answer | |
| } | |
| print(f"[DEBUG AGENT] Q1 answered: {user_answer}") | |
| return state | |
| def generate_question_2(state: AgentState) -> AgentState: | |
| """Node: Generate Q2 and add to messages""" | |
| print(f"[DEBUG AGENT] generate_question_2") | |
| question, reasoning = _generate_narrowing_question(state, 2) | |
| state["narrowing_questions_asked"] = 2 | |
| state["q2_question"] = question | |
| state["reasoning"].append(f"🧠 REASONING (Narrowing Question #2):\n{reasoning}\n\n→ Question: {question}") | |
| assistant_message = f"To help me find the perfect match:\n\n{question}" | |
| print(f"[DEBUG AGENT] Q2: {question[:60]}...") | |
| state["messages"].append({"role": "assistant", "content": assistant_message}) | |
| return state | |
| def wait_for_answer_2(state: AgentState) -> AgentState: | |
| """Node: Wait for user's answer to Q2""" | |
| print(f"[DEBUG AGENT] wait_for_answer_2") | |
| user_answer = interrupt("Waiting for Q2 answer") | |
| if user_answer: | |
| state["messages"].append({"role": "user", "content": user_answer}) | |
| state["user_preferences"]["q2"] = { | |
| "question": state.get("q2_question", ""), | |
| "answer": user_answer | |
| } | |
| print(f"[DEBUG AGENT] Q2 answered: {user_answer}") | |
| return state | |
| def check_narrowing_complete(state: AgentState) -> Literal["ask_more", "finalize"]: | |
| """Conditional edge: Check if we've asked all 2 narrowing questions""" | |
| questions_asked = state.get("narrowing_questions_asked", 0) | |
| if questions_asked >= 2: | |
| return "finalize" | |
| return "ask_more" | |
| def finalize_books(state: AgentState) -> AgentState: | |
| """Node: Use reasoning to select final 3 books based on vibe and preferences""" | |
| print(f"[DEBUG AGENT] finalize_books node started") | |
| print(f"[DEBUG AGENT] books_with_metadata count: {len(state.get('books_with_metadata', []))}") | |
| from prompts import get_book_finalizer_prompt | |
| # Build detailed book summary with full descriptions - no truncation | |
| books_summary_parts = [] | |
| for i, b in enumerate(state["books_with_metadata"]): | |
| desc = b.get('description', 'No description available') | |
| cats = ', '.join(b.get('categories', [])) if b.get('categories') else 'Uncategorized' | |
| books_summary_parts.append(f"{i+1}. {b['title']} by {b['author']}\n Categories: {cats}\n Description: {desc}") | |
| books_summary = "\n\n".join(books_summary_parts) | |
| prefs_summary = json.dumps(state.get("user_preferences", {}), indent=2) | |
| vibe_context = f"Feels like: {state['feels_like']}\nAesthetics: {', '.join(state['aesthetic_genre_keywords'])}\nMood: {', '.join(state['mood_atmosphere'])}\nThemes: {', '.join(state['core_themes'])}\nTropes: {', '.join(state['tropes'])}" | |
| user_prompt = f"Vibe:\n{vibe_context}\n\nCandidate Books:\n{books_summary}\n\nUser Preferences (from Q&A):\n{prefs_summary}\n\nSelect the {NUM_FINAL_BOOKS} best matches (return only JSON array):" | |
| messages = [ | |
| {"role": "system", "content": get_book_finalizer_prompt(NUM_FINAL_BOOKS)}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| print(f"[DEBUG AGENT] finalize_books user_prompt:\n{user_prompt}") | |
| # Use reasoning model for book selection - this is a complex decision | |
| # Increase max_tokens since we're sending full book descriptions | |
| selection_response, reasoning = call_llm(messages, temperature=0.3, model=REASONING_MODEL, include_reasoning=True, max_tokens=5000) | |
| # Log reasoning even if empty | |
| state["reasoning"].append(f"🧠 REASONING (Book Selection):\n{reasoning or 'No reasoning provided'}") | |
| # Parse the JSON response - check both content and reasoning for the array | |
| try: | |
| import re | |
| # First try to find JSON array in the response content | |
| json_match = re.search(r'\[([\d,\s]+)\]', selection_response) | |
| # If not found in content, try to find it in reasoning (some models put answer there) | |
| if not json_match and reasoning: | |
| json_match = re.search(r'\[([\d,\s]+)\]', reasoning) | |
| if json_match: | |
| print(f"[DEBUG AGENT] Found JSON in reasoning instead of content") | |
| if json_match: | |
| indices = json.loads(json_match.group(0)) | |
| selected_books = [state["books_with_metadata"][i-1] for i in indices if 0 < i <= len(state["books_with_metadata"])][:NUM_FINAL_BOOKS] | |
| else: | |
| # Fallback to first 3 books | |
| print(f"[DEBUG AGENT] No JSON array found, using first 3 books") | |
| selected_books = state["books_with_metadata"][:NUM_FINAL_BOOKS] | |
| except Exception as e: | |
| state["reasoning"].append(f"❌ Failed to parse book selection: {e}. Using first 3 books.") | |
| selected_books = state["books_with_metadata"][:NUM_FINAL_BOOKS] | |
| state["final_books"] = selected_books | |
| state["reasoning"].append(f"🧠 REASONING (Book Selection):\n{reasoning}\n\n→ Selected: {[b['title'] for b in selected_books]}") | |
| return state | |
| def generate_soundtrack(state: AgentState) -> AgentState: | |
| """Node: Generate ambient soundtrack using ElevenLabs Music API""" | |
| print(f"[DEBUG AGENT] generate_soundtrack node started") | |
| import requests | |
| import tempfile | |
| ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") | |
| print(f"[DEBUG AGENT] ELEVENLABS_API_KEY present: {bool(ELEVENLABS_API_KEY)}") | |
| if not ELEVENLABS_API_KEY: | |
| print(f"[DEBUG AGENT] No ELEVENLABS_API_KEY - skipping") | |
| state["reasoning"].append("⚠️ ELEVENLABS_API_KEY not set - skipping soundtrack generation") | |
| state["soundtrack_url"] = "" | |
| return state | |
| try: | |
| # Build vibe context for music prompt generation | |
| vibe_context = { | |
| "feels_like": state["feels_like"], | |
| "mood_atmosphere": state["mood_atmosphere"], | |
| "aesthetic_genre_keywords": state["aesthetic_genre_keywords"], | |
| "core_themes": state["core_themes"], | |
| "tropes": state["tropes"] | |
| } | |
| print(f"[DEBUG AGENT] vibe_context built: {list(vibe_context.keys())}") | |
| # Use LLM to generate music prompt from vibe context | |
| from prompts import MUSIC_PROMPT_GENERATION | |
| messages = [ | |
| {"role": "system", "content": MUSIC_PROMPT_GENERATION}, | |
| {"role": "user", "content": f"Generate a music prompt based on this vibe:\n{json.dumps(vibe_context, indent=2)}"} | |
| ] | |
| print(f"[DEBUG AGENT] Calling LLM for music prompt...") | |
| music_prompt, reasoning = call_llm(messages, temperature=0.7, model=FAST_MODEL, include_reasoning=True) | |
| print(f"[DEBUG AGENT] Music prompt generated: {music_prompt[:100] if music_prompt else 'None'}...") | |
| state["reasoning"].append(f"🎵 Music prompt: {music_prompt}") | |
| # Call ElevenLabs Music API directly | |
| print(f"[DEBUG AGENT] Calling ElevenLabs Music API...") | |
| state["reasoning"].append(f"🎵 Calling ElevenLabs Music API...") | |
| response = requests.post( | |
| "https://api.elevenlabs.io/v1/music", | |
| headers={ | |
| "xi-api-key": ELEVENLABS_API_KEY, | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "prompt": music_prompt, | |
| "music_length_ms": 90000, # 1:30 minute | |
| "force_instrumental": True # No vocals, just ambient music | |
| }, | |
| timeout=120 # Music generation can take a while | |
| ) | |
| print(f"[DEBUG AGENT] ElevenLabs response status: {response.status_code}") | |
| if response.status_code == 200: | |
| print(f"[DEBUG AGENT] Success! Response size: {len(response.content)} bytes") | |
| # Save the audio data to a temp file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') | |
| temp_file.write(response.content) | |
| temp_file.close() | |
| print(f"[DEBUG AGENT] Saved to temp file: {temp_file.name}") | |
| state["soundtrack_url"] = temp_file.name | |
| state["reasoning"].append(f"✅ Generated soundtrack successfully ({len(response.content)} bytes)") | |
| else: | |
| print(f"[DEBUG AGENT] ElevenLabs API error: {response.status_code} - {response.text[:500]}") | |
| state["reasoning"].append(f"❌ ElevenLabs API error: {response.status_code} - {response.text[:200]}") | |
| state["soundtrack_url"] = "" | |
| except Exception as e: | |
| import traceback | |
| print(f"[DEBUG AGENT] Exception in generate_soundtrack: {e}") | |
| traceback.print_exc() | |
| state["reasoning"].append(f"❌ Failed to generate soundtrack: {e}") | |
| state["soundtrack_url"] = "" | |
| print(f"[DEBUG AGENT] generate_soundtrack finished, soundtrack_url: {state.get('soundtrack_url', 'not set')}") | |
| return state | |
| def present_final_results(state: AgentState) -> AgentState: | |
| """Node: Format and present final results to user""" | |
| # Format books for display | |
| books_text = "Here are your personalized book recommendations:\n\n" | |
| for i, book in enumerate(state["final_books"], 1): | |
| books_text += f"{i}. **{book['title']}** by {book['author']}\n" | |
| state["messages"].append({ | |
| "role": "assistant", | |
| "content": books_text + f"\n\nI'm also generating a soundtrack that matches your vibe! Scroll down for all the goodies ⬇️" | |
| }) | |
| state["reasoning"].append("Presented final results to user") | |
| return state | |
| # ============================================================================ | |
| # GRAPH CONSTRUCTION | |
| # ============================================================================ | |
| def create_agent_graph(): | |
| """Create and compile the LangGraph workflow with interrupts for user input""" | |
| from langgraph.checkpoint.memory import MemorySaver | |
| # Initialize graph | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("generate_initial_vibe", generate_initial_vibe) | |
| workflow.add_node("refine_vibe", refine_vibe) | |
| workflow.add_node("retrieve_books", retrieve_books) | |
| workflow.add_node("fetch_metadata", fetch_book_metadata) | |
| workflow.add_node("generate_q1", generate_question_1) | |
| workflow.add_node("wait_a1", wait_for_answer_1) | |
| workflow.add_node("generate_q2", generate_question_2) | |
| workflow.add_node("wait_a2", wait_for_answer_2) | |
| workflow.add_node("finalize_books", finalize_books) | |
| workflow.add_node("generate_soundtrack", generate_soundtrack) | |
| workflow.add_node("present_results", present_final_results) | |
| # Set entry point | |
| workflow.set_entry_point("generate_initial_vibe") | |
| # After initial vibe, check if user is satisfied or wants refinement | |
| workflow.add_conditional_edges( | |
| "generate_initial_vibe", | |
| check_vibe_satisfaction, | |
| { | |
| "refine": "refine_vibe", | |
| "retrieve": "retrieve_books" | |
| } | |
| ) | |
| # After refinement, check again if user is satisfied | |
| workflow.add_conditional_edges( | |
| "refine_vibe", | |
| check_vibe_satisfaction, | |
| { | |
| "refine": "refine_vibe", | |
| "retrieve": "retrieve_books" | |
| } | |
| ) | |
| # Sequential: retrieve -> fetch -> generate Q1 -> wait A1 -> generate Q2 -> wait A2 -> finalize | |
| workflow.add_edge("retrieve_books", "fetch_metadata") | |
| workflow.add_edge("fetch_metadata", "generate_q1") | |
| workflow.add_edge("generate_q1", "wait_a1") | |
| workflow.add_edge("wait_a1", "generate_q2") | |
| workflow.add_edge("generate_q2", "wait_a2") | |
| workflow.add_edge("wait_a2", "finalize_books") | |
| # Sequential: finalize -> soundtrack -> present | |
| workflow.add_edge("finalize_books", "generate_soundtrack") | |
| workflow.add_edge("generate_soundtrack", "present_results") | |
| workflow.add_edge("present_results", END) | |
| # Compile with checkpointer for state persistence | |
| memory = MemorySaver() | |
| return workflow.compile(checkpointer=memory) | |
| # ============================================================================ | |
| # MAIN INTERFACE | |
| # ============================================================================ | |
| # Global graph instance with persistent checkpointer | |
| _GRAPH_INSTANCE = None | |
| def get_graph(): | |
| """Get or create the compiled graph with checkpointer""" | |
| global _GRAPH_INSTANCE | |
| if _GRAPH_INSTANCE is None: | |
| print(f"[DEBUG AGENT] Creating NEW graph instance!") | |
| _GRAPH_INSTANCE = create_agent_graph() | |
| else: | |
| print(f"[DEBUG AGENT] Reusing existing graph instance") | |
| return _GRAPH_INSTANCE | |
| def reset_agent(): | |
| """Reset the agent by clearing the graph instance""" | |
| global _GRAPH_INSTANCE | |
| _GRAPH_INSTANCE = None | |
| def run_agent(images: List[str], user_message: str = None, thread_id: str = "main"): | |
| """ | |
| Main interface to run the agent with interrupt-based human-in-the-loop | |
| Args: | |
| images: List of image URLs/base64 for initial upload | |
| user_message: User's message (for resuming after interrupt) | |
| thread_id: Unique identifier for the user session (required for multi-user support) | |
| Returns: | |
| Updated state with agent's response | |
| """ | |
| from langgraph.types import Command | |
| graph = get_graph() | |
| thread_config = {"configurable": {"thread_id": thread_id}} | |
| # Initialize state if new conversation (images provided) | |
| if images and len(images) > 0: | |
| initial_state = AgentState( | |
| images=images, | |
| messages=[], | |
| aesthetic_genre_keywords=[], | |
| mood_atmosphere=[], | |
| core_themes=[], | |
| tropes=[], | |
| feels_like="", | |
| vibe_refinement_count=0, | |
| retrieved_books=[], | |
| books_with_metadata=[], | |
| q1_question="", | |
| q2_question="", | |
| user_preferences={}, | |
| final_books=[], | |
| soundtrack_url="", | |
| reasoning=[] | |
| ) | |
| # Start the graph - it will stop at first interrupt() | |
| result = graph.invoke(initial_state, thread_config) | |
| return result | |
| # Resume with user's message | |
| if user_message: | |
| # Check current state before resuming | |
| current_state = graph.get_state(thread_config) | |
| print(f"[DEBUG AGENT] State BEFORE resume:") | |
| print(f"[DEBUG AGENT] messages count: {len(current_state.values.get('messages', []))}") | |
| for i, m in enumerate(current_state.values.get('messages', [])): | |
| print(f"[DEBUG AGENT] msg[{i}]: {m.get('role')} - {m.get('content', '')[:60]}...") | |
| print(f"[DEBUG AGENT] q1_question: '{current_state.values.get('q1_question', '')[:50] if current_state.values.get('q1_question') else 'EMPTY'}'") | |
| # Resume from the last interrupt; the value passed to Command(resume=...) | |
| # is what the corresponding interrupt(...) call will return inside the node. | |
| print(f"[DEBUG AGENT] Resuming graph with user_message: {user_message[:50]}...") | |
| result = graph.invoke(Command(resume=user_message), thread_config) | |
| print(f"[DEBUG AGENT] graph.invoke returned: {type(result)}, keys: {list(result.keys()) if hasattr(result, 'keys') else 'N/A'}") | |
| print(f"[DEBUG AGENT] result has {len(result.get('messages', []))} messages") | |
| # Remove __interrupt__ key if present before returning | |
| if "__interrupt__" in result: | |
| result = {k: v for k, v in result.items() if k != "__interrupt__"} | |
| return result | |
| return None | |