import time import re import json from pathlib import Path from typing import Any, Dict, List from pinecone import Pinecone, ServerlessSpec # Added cacheing to reduce consecutive startup time # --@Qamar def slugify_technique(name): """Converts 'Sentence Splitter' to 'sentence-splitter' for Pinecone naming.""" return re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-') def get_index_by_name(api_key: str, index_name: str): """ Directly connects to a Pinecone index by its full string name. Useful for the API/Production side where the name is already known. """ pc = Pinecone(api_key=api_key) # Check if it exists first to avoid a 404 crash existing_indexes = [idx.name for idx in pc.list_indexes()] if index_name not in existing_indexes: raise ValueError(f"Index '{index_name}' does not exist in your Pinecone project.") print(f" Connecting to Index: {index_name}") return pc.Index(index_name) def get_pinecone_index(api_key, base_name, technique, dimension=384, metric="cosine"): """ Creates/Returns an index specifically for a technique. Example: 'arxiv-index-token' """ pc = Pinecone(api_key=api_key) tech_slug = slugify_technique(technique) full_index_name = f"{base_name}-{tech_slug}" existing_indexes = [idx.name for idx in pc.list_indexes()] if full_index_name not in existing_indexes: print(f" Creating specialized index: {full_index_name}...") pc.create_index( name=full_index_name, dimension=dimension, metric=metric, spec=ServerlessSpec(cloud="aws", region="us-east-1") ) # Wait for index to spin up while not pc.describe_index(full_index_name).status['ready']: time.sleep(1) # Use our new helper to return the index object return get_index_by_name(api_key, full_index_name) def refresh_pinecone_index(index, final_chunks, batch_size=100): """ Refreshes the specific index. Since index is now technique-specific, we just check if it's already populated. """ if not final_chunks: print("No chunks provided to refresh.") return False try: # Check current stats for this specific index stats = index.describe_index_stats() current_count = stats.get('total_vector_count', 0) expected_count = len(final_chunks) print(f" Index Stats -> Existing: {current_count} | New Chunks: {expected_count}") if current_count == 0: print(f"➕ Index is empty. Upserting {expected_count} vectors...") vectors = prepare_vectors_for_upsert(final_chunks) upsert_to_pinecone(index, vectors, batch_size) return True elif current_count < expected_count: # Simple check to see if we need to top up or refresh print(f" Vector count mismatch ({current_count} < {expected_count}). Updating index...") vectors = prepare_vectors_for_upsert(final_chunks) upsert_to_pinecone(index, vectors, batch_size) return True else: print(f" Index is already populated with {current_count} vectors. Ready for search.") return False except Exception as e: print(f" Error refreshing index: {e}") return False # Utility functions remain the same as previous version def prepare_vectors_for_upsert(final_chunks): vectors = [] for chunk in final_chunks: meta = chunk.get('metadata', {}) vectors.append({ 'id': chunk['id'], 'values': chunk['values'], 'metadata': { 'text': meta.get('text', ""), 'title': meta.get('title', ""), 'url': meta.get('url', ""), 'chunk_index': meta.get('chunk_index', 0), 'technique': meta.get('technique', "unknown"), 'chunking_technique': meta.get('chunking_technique', "unknown") } }) return vectors def upsert_to_pinecone(index, chunks, batch_size=100): for i in range(0, len(chunks), batch_size): batch = chunks[i : i + batch_size] index.upsert(vectors=batch) # Some methods for loading chunks back from Pinecone with local caching to speed up BM25 initialization def _sanitize_index_name(index_name: str) -> str: return re.sub(r'[^a-zA-Z0-9._-]+', '-', index_name).strip('-') or 'default-index' def _chunk_cache_path(cache_dir: str, index_name: str) -> Path: cache_root = Path(cache_dir) cache_root.mkdir(parents=True, exist_ok=True) safe_name = _sanitize_index_name(index_name) return cache_root / f"bm25_chunks_{safe_name}.json" def _read_chunk_cache(path: Path) -> Dict[str, Any]: with path.open("r", encoding="utf-8") as f: return json.load(f) def _write_chunk_cache(path: Path, payload: Dict[str, Any]) -> None: with path.open("w", encoding="utf-8") as f: json.dump(payload, f) def load_chunks_with_local_cache( index, index_name: str, cache_dir: str = ".cache", batch_size: int = 100, force_refresh: bool = False, ) -> tuple[List[Dict[str, Any]], str]: cache_file = _chunk_cache_path(cache_dir=cache_dir, index_name=index_name) stats = index.describe_index_stats() current_count = stats.get("total_vector_count", 0) if not force_refresh and cache_file.exists(): try: cached_payload = _read_chunk_cache(cache_file) cached_meta = cached_payload.get("meta", {}) cached_count = cached_meta.get("vector_count", -1) cached_chunks = cached_payload.get("chunks", []) if cached_count == current_count and cached_chunks: print( f" Loaded BM25 chunk cache: {cache_file} " f"(chunks={len(cached_chunks)}, vectors={cached_count})" ) return cached_chunks, "cache" print( " BM25 cache stale or empty. " f"cache_vectors={cached_count}, pinecone_vectors={current_count}. Refreshing..." ) except Exception as e: print(f" Failed to read BM25 cache ({cache_file}): {e}. Refreshing from Pinecone...") chunks = load_chunks_from_pinecone(index=index, batch_size=batch_size) payload = { "meta": { "index_name": index_name, "vector_count": current_count, "updated_at_epoch_s": int(time.time()), }, "chunks": chunks, } try: _write_chunk_cache(cache_file, payload) print(f" Saved BM25 chunk cache: {cache_file} (chunks={len(chunks)})") except Exception as e: print(f" Failed to write BM25 cache ({cache_file}): {e}") return chunks, "pinecone" def load_chunks_from_pinecone(index, batch_size: int = 100) -> list[dict[str, any]]: """ Scans the Pinecone index to retrieve all text metadata for the BM25 corpus. """ stats = index.describe_index_stats() namespaces = list(stats.get('namespaces', {}).keys()) # If no namespaces are explicitly named, Pinecone uses an empty string for the default if not namespaces: namespaces = [""] all_chunks: List[Dict[str, Any]] = [] seen_ids = set() print(f"Loading vectors for BM25 from namespaces: {namespaces}") for ns in namespaces: # Pinecone's list() generator returns batches of IDs for id_batch in index.list(namespace=ns, limit=batch_size): if not id_batch: continue # Fetch the actual content (metadata) for this batch of IDs fetched = index.fetch(ids=id_batch, namespace=ns) vectors = getattr(fetched, "vectors", {}) for vector_id, vector_data in vectors.items(): if vector_id in seen_ids: continue seen_ids.add(vector_id) # Safely extract metadata metadata = getattr(vector_data, "metadata", {}) text = metadata.get("text") if not text: continue all_chunks.append({ "id": vector_id, "metadata": { "text": text, "title": metadata.get("title", "Untitled"), "url": metadata.get("url", ""), "chunk_index": metadata.get("chunk_index", 0) } }) print(f" Finished namespace: '{ns if ns else 'default'}'") print(f"Total chunks loaded into memory: {len(all_chunks)}") return all_chunks