Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| from contextlib import asynccontextmanager | |
| from bs4 import BeautifulSoup | |
| from fastapi import FastAPI, HTTPException | |
| from neo4j import GraphDatabase, basic_auth | |
| import google.generativeai as genai | |
| import logging # Import logging module | |
| # --- Logging Configuration --- | |
| # Basic logger configuration to display INFO messages and above. | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) # Create a logger instance for this module | |
| # --- Environment Variable Configuration --- | |
| NEO4J_URI = os.getenv("NEO4J_URI") | |
| NEO4J_USER = os.getenv("NEO4J_USER") | |
| NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") | |
| # Validation of essential configurations | |
| if not NEO4J_URI or not NEO4J_USER or not NEO4J_PASSWORD: | |
| logger.critical("CRITICAL ERROR: NEO4J_URI, NEO4J_USER, and NEO4J_PASSWORD environment variables must be set.") | |
| # --- Application Lifecycle (Startup/Shutdown) --- | |
| async def lifespan(app: FastAPI): | |
| """Handles startup and shutdown events.""" | |
| # Initialize Gemini Client | |
| logger.info("Initializing Gemini client...") | |
| if genai: | |
| try: | |
| # Assuming GEMINI_API_KEY is set in environment or loaded via settings | |
| api_key = os.getenv("GEMINI_API_KEY") or getattr(settings, "GEMINI_API_KEY", None) | |
| if not api_key: | |
| raise ValueError("GEMINI_API_KEY not found in environment or settings.") | |
| else: | |
| genai.configure(api_key=api_key) | |
| logger.info("Gemini client configured successfully.") | |
| except Exception as e: | |
| logger.error(f"Failed to configure Gemini client: {e}", exc_info=True) | |
| else: | |
| logger.warning("Gemini library not imported. Endpoints requiring Gemini will not work.") | |
| yield # API runs here | |
| # --- Shutdown --- | |
| logger.info("API shutting down...") | |
| logger.info("API shutdown complete.") | |
| # Initialize FastAPI application | |
| app = FastAPI( | |
| title="Neo4j Importer", | |
| description="API to fetch documents, summarize it with Gemini, and add it to Neo4j.", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # --- Utility Functions (Adapted from your script) --- | |
| def get_content(number: str, node_type: str) -> str: | |
| """Fetches raw HTML content from Arxiv or other sources.""" | |
| redirect_links = { | |
| "Patent": f"https://patents.google.com/patent/{number}/en", | |
| "ResearchPaper": f"https://arxiv.org/abs/{number}" | |
| } | |
| url = redirect_links.get(node_type) | |
| if not url: | |
| logger.warning(f"Unknown node type: {node_type} for number {number}") | |
| return "" | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() # Raises HTTPError for bad responses (4XX or 5XX) | |
| return response.content.decode('utf-8', errors='replace').replace("\n", "") | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request error for {node_type} number: {number} at URL {url}: {e}") | |
| return "" | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred in get_content for {number}: {e}") | |
| return "" | |
| def extract_arxiv(rp_number: str, node_type: str = "ResearchPaper") -> dict: | |
| """Extracts information from an Arxiv research paper and generates a summary.""" | |
| rp_data = { | |
| "document": f"Arxiv {rp_number}", # ID for the paper | |
| "title": "Error fetching content or content not found", | |
| "abstract": "Error fetching content or content not found", | |
| "summary": "Summary not yet generated" # Default summary | |
| } | |
| raw_content = get_content(rp_number, node_type) | |
| if not raw_content: | |
| logger.warning(f"No content fetched for Arxiv ID: {rp_number}") | |
| return rp_data # Returns default error data | |
| try: | |
| soup = BeautifulSoup(raw_content, 'html.parser') | |
| # Extract Title | |
| title_tag = soup.find('h1', class_='title') | |
| if title_tag and title_tag.find('span', class_='descriptor'): | |
| title_text = title_tag.find('span', class_='descriptor').next_sibling | |
| if title_text and isinstance(title_text, str): | |
| rp_data["title"] = title_text.strip() | |
| else: | |
| rp_data["title"] = title_tag.get_text(separator=" ", strip=True).replace("Title:", "").strip() | |
| elif title_tag : # Fallback if the span descriptor is not there but h1.title exists | |
| rp_data["title"] = title_tag.get_text(separator=" ", strip=True).replace("Title:", "").strip() | |
| # Extract Abstract | |
| abstract_tag = soup.find('blockquote', class_='abstract') | |
| if abstract_tag: | |
| abstract_text = abstract_tag.get_text(strip=True) | |
| if abstract_text.lower().startswith('abstract'): # Check if "abstract" (case-insensitive) is at the beginning | |
| # Find the first occurrence of ':' after "abstract" or just remove "abstract" prefix | |
| prefix_end = abstract_text.lower().find('abstract') + len('abstract') | |
| if prefix_end < len(abstract_text) and abstract_text[prefix_end] == ':': | |
| prefix_end += 1 # Include the colon in removal | |
| abstract_text = abstract_text[prefix_end:].strip() | |
| rp_data["abstract"] = abstract_text | |
| # Mark if title or abstract are still not found | |
| if rp_data["title"] == "Error fetching content or content not found" and not title_tag: | |
| rp_data["title"] = "Title not found on page" | |
| if rp_data["abstract"] == "Error fetching content or content not found" and not abstract_tag: | |
| rp_data["abstract"] = "Abstract not found on page" | |
| except Exception as e: | |
| logger.error(f"Failed to parse content for Arxiv ID {rp_number}: {e}") | |
| # Generate summary with Gemini API if available and abstract exists | |
| if rp_data["abstract"] and \ | |
| not rp_data["abstract"].startswith("Error fetching content") and \ | |
| not rp_data["abstract"].startswith("Abstract not found"): | |
| prompt = f"""You are a 3GPP standardization expert. Summarize the key information in the provided document in simple technical English relevant to identifying potential Key Issues. | |
| Focus on challenges, gaps, or novel aspects. | |
| Here is the document: <document>{rp_data['abstract']}<document>""" | |
| try: | |
| model = genai.GenerativeModel("gemini-2.5-flash-preview-05-20") | |
| response = model.generate_content(prompt) | |
| rp_data["summary"] = response.text | |
| logger.info(f"Summary generated for Arxiv ID: {rp_number}") | |
| except Exception as e: | |
| logger.error(f"Error generating summary with Gemini for Arxiv ID {rp_number}: {e}") | |
| rp_data["summary"] = "Error generating summary (API failure)" | |
| else: | |
| rp_data["summary"] = "Summary not generated (Abstract unavailable or problematic)" | |
| return rp_data | |
| def extract_google_patents(patent_number: str, node_type: str = "Patent"): | |
| """ | |
| Extracts information from a Google Patents page with robust error handling. | |
| """ | |
| # Initialize a dictionary with default error messages for consistency. | |
| patent_data = { | |
| "number": f"{patent_number}", | |
| "title": "Error fetching content or content not found", | |
| "description": "Error fetching content or content not found", | |
| "claim": "Error fetching content or content not found", | |
| "summary": "Summary not yet generated" # Default summary | |
| } | |
| # Use the generic get_content function to fetch the raw page content. | |
| raw_content = get_content(patent_number, node_type) | |
| if not raw_content: | |
| logger.warning(f"No content fetched for Patent ID: {patent_number}") | |
| return patent_data # Return the dictionary with default error messages. | |
| try: | |
| # Let BeautifulSoup handle the decoding from raw bytes. | |
| soup = BeautifulSoup(raw_content, 'html.parser') | |
| # --- Extract Title --- | |
| title_tag = soup.find('meta', attrs={'name': 'DC.title'}) | |
| if title_tag and title_tag.get('content'): | |
| patent_data["title"] = title_tag['content'].strip() | |
| else: | |
| # Fallback to finding the title in an <h1> tag. | |
| title_h1 = soup.find('h1', id='title') | |
| if title_h1: | |
| patent_data["title"] = title_h1.get_text(strip=True) | |
| # --- Extract Description --- | |
| description_section = soup.find('section', itemprop='description') | |
| if description_section: | |
| # Remove unnecessary nested spans to clean the output. | |
| for src_text in description_section.find_all('span', class_='google-src-text'): | |
| src_text.decompose() | |
| patent_data["description"] = description_section.get_text(separator=' ', strip=True) | |
| # --- Extract Claims --- | |
| claims_section = soup.find('section', itemprop='claims') | |
| if claims_section: | |
| # Remove unnecessary nested spans here as well. | |
| for src_text in claims_section.find_all('span', class_='google-src-text'): | |
| src_text.decompose() | |
| patent_data["claim"] = claims_section.get_text(separator=' ', strip=True) | |
| # Update status message if specific sections were not found on the page. | |
| if patent_data["title"] == "Error fetching content or content not found": | |
| patent_data["title"] = "Title not found on page" | |
| if patent_data["description"] == "Error fetching content or content not found": | |
| patent_data["description"] = "Description not found on page" | |
| if patent_data["claim"] == "Error fetching content or content not found": | |
| patent_data["claim"] = "Claim not found on page" | |
| except Exception as e: | |
| # Catch any unexpected errors during the parsing process. | |
| logger.error(f"Failed to parse content for Patent ID {patent_number}: {e}") | |
| # Generate summary with Gemini API if available and abstract exists | |
| if patent_data["description"] and \ | |
| not patent_data["description"].startswith("Error fetching content") and \ | |
| not patent_data["description"].startswith("Description not found"): | |
| prompt = f"""You are a 3GPP standardization expert. Summarize the key information in the provided document in simple technical English relevant to identifying potential Key Issues. | |
| Focus on challenges, gaps, or novel aspects. | |
| Here is the document: <document>{patent_data['description']}<document>""" | |
| try: | |
| model = genai.GenerativeModel("gemini-2.5-flash-preview-05-20") | |
| response = model.generate_content(prompt) | |
| patent_data["summary"] = response.text | |
| logger.info(f"Summary generated for Patent ID: {patent_number}") | |
| except Exception as e: | |
| logger.error(f"Error generating summary with Gemini for Patent ID {patent_number}: {e}") | |
| patent_data["summary"] = "Error generating summary (API failure)" | |
| else: | |
| rp_data["summary"] = "Summary not generated (Description unavailable or problematic)" | |
| return patent_data | |
| def add_nodes_to_neo4j(driver, data_list: list, node_type: str): | |
| """Adds a list of nodes to Neo4j in a single transaction.""" | |
| if not data_list: | |
| logger.warning("No data provided to add_nodes_to_neo4j.") | |
| return 0 | |
| query = ( | |
| "UNWIND $data as properties " | |
| f"CREATE (n:{node_type}) " | |
| "SET n = properties" | |
| ) | |
| # query = ( | |
| # f"UNWIND $data as properties " | |
| # f"MERGE (n:{node_type} {{arxiv_id: properties.arxiv_id}}) " # Use MERGE for idempotency | |
| # f"ON CREATE SET n = properties " | |
| # f"ON MATCH SET n += properties" # Update properties if the node already exists | |
| # ) | |
| try: | |
| with driver.session(database="neo4j") as session: # Specify database if not default | |
| result = session.execute_write(lambda tx: tx.run(query, data=data_list).consume()) | |
| nodes_created = result.counters.nodes_created | |
| if nodes_created > 0: | |
| logger.info(f"{nodes_created} new {node_type} node(s) added successfully.") | |
| return nodes_created # Return the number of nodes actually created | |
| except Exception as e: | |
| logger.error(f"Neo4j Error - Failed to add/update {node_type} nodes: {e}") | |
| raise HTTPException(status_code=500, detail=f"Neo4j database error: {e}") | |
| # --- FastAPI Endpoint --- | |
| # API state check route | |
| def read_root(): | |
| return {"status": "ok"} | |
| # 201 Created for successful creation | |
| async def add_single_research_paper(arxiv_id: str): | |
| """ | |
| Fetches a research paper from Arxiv by its ID, extracts information, | |
| generates a summary, and adds/updates it as a 'ResearchPaper' node in Neo4j. | |
| """ | |
| node_type = "ResearchPaper" | |
| logger.info(f"Processing request for Arxiv ID: {arxiv_id}") | |
| if not NEO4J_URI or not NEO4J_USER or not NEO4J_PASSWORD: | |
| logger.error("Neo4j database connection details are not configured on the server.") | |
| raise HTTPException(status_code=500, detail="Neo4j database connection details are not configured on the server.") | |
| # Step 1: Extract paper data | |
| paper_data = extract_arxiv(arxiv_id, node_type) | |
| if paper_data["title"].startswith("Error fetching content") or paper_data["title"] == "Title not found on page": | |
| logger.warning(f"Could not fetch or parse content for Arxiv ID {arxiv_id}. Title: {paper_data['title']}") | |
| raise HTTPException(status_code=404, detail=f"Could not fetch or parse content for Arxiv ID {arxiv_id}. Title: {paper_data['title']}") | |
| # Step 2: Add/Update in Neo4j | |
| driver_instance = None # Initialize for the finally block | |
| try: | |
| auth_token = basic_auth(NEO4J_USER, NEO4J_PASSWORD) | |
| driver_instance = GraphDatabase.driver(NEO4J_URI, auth=auth_token) | |
| driver_instance.verify_connectivity() | |
| logger.info("Successfully connected to Neo4j.") | |
| nodes_created_count = add_nodes_to_neo4j(driver_instance, [paper_data], node_type) | |
| if nodes_created_count > 0 : | |
| logger.info(f"Research paper {arxiv_id} was successfully added to Neo4j.") | |
| status_code_response = 201 # Created | |
| # Note: FastAPI uses the status_code from the decorator or HTTPException. | |
| # This custom status_code_response is for the JSON body if needed, but the actual HTTP response status | |
| # will be 201 (from decorator) unless an HTTPException overrides it or we change the decorator based on logic. | |
| # For simplicity here, we'll return it in the body and let the decorator's 201 stand if no error. | |
| # A more advanced setup might change the response status dynamically. | |
| return {"data": paper_data} | |
| except HTTPException as e: # Re-raise HTTPExceptions | |
| logger.error(f"HTTPException during Neo4j operation for {arxiv_id}: {e.detail}") | |
| raise e | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred during Neo4j operation for {arxiv_id}: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"An unexpected server error occurred: {e}") | |
| finally: | |
| if driver_instance: | |
| driver_instance.close() | |
| logger.info("Neo4j connection closed.") | |
| # 201 Created for successful creation | |
| async def add_single_patent(patent_id: str): | |
| """ | |
| Fetches a patent from Google Patents by its ID, extracts information, | |
| generates a summary, and adds/updates it as a 'Patent' node in Neo4j. | |
| """ | |
| node_type = "Patent" | |
| logger.info(f"Processing request for Patent ID: {patent_id}") | |
| if not NEO4J_URI or not NEO4J_USER or not NEO4J_PASSWORD: | |
| logger.error("Neo4j database connection details are not configured on the server.") | |
| raise HTTPException(status_code=500, detail="Neo4j database connection details are not configured on the server.") | |
| # Step 1: Extract patent data | |
| patent_data = extract_google_patents(patent_id, node_type) | |
| if patent_data["title"].startswith("Error fetching content") or patent_data["title"] == "Title not found on page": | |
| logger.warning(f"Could not fetch or parse content for Patent ID {patent_id}. Title: {patent_data['title']}") | |
| raise HTTPException(status_code=404, detail=f"Could not fetch or parse content for Patent ID {patent_id}. Title: {patent_data['title']}") | |
| # Step 2: Add/Update in Neo4j | |
| driver_instance = None # Initialize for the finally block | |
| try: | |
| auth_token = basic_auth(NEO4J_USER, NEO4J_PASSWORD) | |
| driver_instance = GraphDatabase.driver(NEO4J_URI, auth=auth_token) | |
| driver_instance.verify_connectivity() | |
| logger.info("Successfully connected to Neo4j.") | |
| nodes_created_count = add_nodes_to_neo4j(driver_instance, [patent_data], node_type) | |
| if nodes_created_count > 0 : | |
| logger.info(f"Patent {patent_id} was successfully added to Neo4j.") | |
| status_code_response = 201 # Created | |
| # Note: FastAPI uses the status_code from the decorator or HTTPException. | |
| # This custom status_code_response is for the JSON body if needed, but the actual HTTP response status | |
| # will be 201 (from decorator) unless an HTTPException overrides it or we change the decorator based on logic. | |
| # For simplicity here, we'll return it in the body and let the decorator's 201 stand if no error. | |
| # A more advanced setup might change the response status dynamically. | |
| return {"data": patent_data} | |
| except HTTPException as e: # Re-raise HTTPExceptions | |
| logger.error(f"HTTPException during Neo4j operation for {patent_id}: {e.detail}") | |
| raise e | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred during Neo4j operation for {patent_id}: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"An unexpected server error occurred: {e}") | |
| finally: | |
| if driver_instance: | |
| driver_instance.close() | |
| logger.info("Neo4j connection closed.") |