Spaces:
Runtime error
Runtime error
| import requests, os, zipfile, subprocess, re, warnings | |
| warnings.filterwarnings("ignore") | |
| os.environ["CURL_CA_BUNDLE"] = "" | |
| from io import BytesIO | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from datasets import load_dataset | |
| import fitz | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| app = FastAPI(title="Specification Retriever/Splitter API", | |
| description=open('documentation.md').read(), | |
| docs_url="/") | |
| origins = [ | |
| "*", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| spec_contents_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecContent") | |
| spec_contents_3gpp = spec_contents_3gpp["train"].to_list() | |
| spec_contents_etsi = load_dataset("OrganizedProgrammers/ETSISpecContent") | |
| spec_contents_etsi = spec_contents_etsi["train"].to_list() | |
| spec_3gpp_format = re.compile(r'^\d{2}\.\d{3}(?:-\d+)?') | |
| spec_etsi_format = re.compile(r'^\d{,3} \d{,3}(?:-\d+)?') | |
| class SpecRequest(BaseModel): | |
| spec_id: str | |
| def is_doc_indexed(spec_id: str): | |
| return any([True if spec_id == s["doc_id"] else False for s in spec_contents_3gpp]) or any([True if spec_id == s["doc_id"] else False for s in spec_contents_etsi]) | |
| def get_doc(spec_id: str): | |
| doc = [] | |
| for spec in spec_contents_3gpp + spec_contents_etsi: | |
| if spec["doc_id"] == spec_id: | |
| doc.append(f"{spec['section']}\n{spec['content']}") | |
| return "\n\n".join(doc) | |
| def get_structured_doc(spec_id: str): | |
| doc = {} | |
| for spec in spec_contents_3gpp + spec_contents_etsi: | |
| if spec["doc_id"] == spec_id: | |
| doc[spec["section"]] = spec["content"] | |
| return doc | |
| def get_pdf_data(request: SpecRequest): | |
| specification = request.spec_id | |
| if is_doc_indexed(specification): | |
| return get_doc(specification) | |
| url = requests.post( | |
| "https://organizedprogrammers-docfinder.hf.space/find/single", | |
| verify=False, | |
| headers={"Content-Type": "application/json"}, | |
| json={"doc_id": specification} | |
| ) | |
| if url.status_code != 200: | |
| raise HTTPException(404, detail="Not found") | |
| url = url.json()['url'] | |
| response = requests.get( | |
| url, | |
| verify=False, | |
| headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} | |
| ) | |
| pdf = fitz.open(stream=response.content, filetype="pdf") | |
| return pdf, pdf.get_toc() | |
| def extract_full_spec(request: SpecRequest): | |
| specification = request.spec_id | |
| if is_doc_indexed(specification): | |
| return get_doc(specification) | |
| print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") | |
| total_file = [] | |
| if spec_3gpp_format.match(specification): | |
| url = requests.post( | |
| "https://organizedprogrammers-docfinder.hf.space/find/single", | |
| verify=False, | |
| headers={"Content-Type": "application/json"}, | |
| json={"doc_id": specification} | |
| ) | |
| if url.status_code != 200: | |
| raise HTTPException(404, detail="Not found") | |
| url = url.json()['url'] | |
| response = requests.get( | |
| url, | |
| verify=False, | |
| headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} | |
| ) | |
| zip_bytes = BytesIO(response.content) | |
| current_zip_file = zipfile.ZipFile(zip_bytes) | |
| for file_info in current_zip_file.infolist(): | |
| if file_info.filename.endswith(".zip") and len(current_zip_file.namelist()) == 1: | |
| nested_zip_bytes = BytesIO(current_zip_file.read(file_info.filename)) | |
| current_zip_file = zipfile.ZipFile(nested_zip_bytes) | |
| break | |
| for file_info in current_zip_file.infolist(): | |
| filename = file_info.filename | |
| if (filename.endswith('.doc') or filename.endswith('.docx')) and ("cover" not in filename.lower() and "annex" not in filename.lower()): | |
| doc_bytes = current_zip_file.read(filename) | |
| ext = filename.split(".")[-1] | |
| input_path = f"/tmp/{specification}.{ext}" | |
| output_path = f"/tmp/{specification}.txt" | |
| with open(input_path, "wb") as f: | |
| f.write(doc_bytes) | |
| subprocess.run([ | |
| "libreoffice", | |
| "--headless", | |
| "--convert-to", "txt", | |
| "--outdir", "/tmp", | |
| input_path | |
| ], check=True) | |
| with open(output_path, "r") as f: | |
| txt_data = [line.strip() for line in f if line.strip()] | |
| os.remove(input_path) | |
| os.remove(output_path) | |
| total_file.extend(txt_data) | |
| if total_file == []: | |
| raise HTTPException(status_code=404, detail="Not found !") | |
| else: | |
| return total_file | |
| elif spec_etsi_format.match(specification): | |
| print("\n[INFO] Tentative de récupération du texte", flush=True) | |
| pdf, doc_toc = get_pdf_data(request) | |
| text = [] | |
| first = 0 | |
| for level, title, page in doc_toc: | |
| if title[0].isnumeric(): | |
| first = page - 1 | |
| break | |
| for page in pdf[first:]: | |
| text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) | |
| text = "\n".join(text) | |
| if not text or not doc_toc: | |
| print("\n[ERREUR] Pas de texte/table of contents trouvé !") | |
| return {} | |
| print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True) | |
| return text | |
| else: | |
| raise HTTPException(status_code=400, detail="Document ID format invalid !") | |
| def extract_full_spec_by_chapters(request: SpecRequest): | |
| specification = request.spec_id | |
| if is_doc_indexed(request.spec_id): | |
| return get_structured_doc(request.spec_id) | |
| print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") | |
| total_file = [] | |
| text = extract_full_spec(request) | |
| if spec_3gpp_format.match(specification): | |
| chapters = [] | |
| chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+$") | |
| for i, line in enumerate(text): | |
| if chapter_regex.fullmatch(line): | |
| chapters.append((i, line)) | |
| document = {} | |
| for i in range(len(chapters)): | |
| start_index, chapter_title = chapters[i] | |
| end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) | |
| content_lines = text[start_index + 1 : end_index] | |
| document[chapter_title.replace('\t', " ")] = "\n".join(content_lines) | |
| return document | |
| elif spec_etsi_format.match(specification): | |
| def extract_sections(text, titles): | |
| sections = {} | |
| # On trie les titres selon leur position dans le texte | |
| sorted_titles = sorted(titles, key=lambda t: text.find(t)) | |
| for i, title in enumerate(sorted_titles): | |
| start = text.find(title) | |
| if i + 1 < len(sorted_titles): | |
| end = text.find(sorted_titles[i + 1]) | |
| sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) | |
| else: | |
| sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) | |
| return sections | |
| pdf, toc = get_pdf_data(request) | |
| if not text or not toc: | |
| print("\n[ERREUR] Pas de texte/table of contents trouvé !") | |
| return {} | |
| print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True) | |
| titles = [] | |
| for level, title, page in toc: | |
| if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: | |
| titles.append('\n'.join(title.strip().split(" ", 1))) | |
| return extract_sections(text, titles) | |
| else: | |
| raise HTTPException(status_code=400, detail="Document ID format invalid !") |