Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from pathlib import Path | |
| import os | |
| from transformers import AutoTokenizer, AutoModel, AutoModelForQuestionAnswering, pipeline | |
| from transformers import MarianMTModel, MarianTokenizer | |
| from nltk.tokenize import sent_tokenize | |
| from nltk.tokenize import LineTokenizer | |
| import math | |
| import torch | |
| import nltk | |
| import numpy as np | |
| import time | |
| import hashlib | |
| from tqdm import tqdm | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| import textract | |
| from scipy.special import softmax | |
| import pandas as pd | |
| from datetime import datetime | |
| nltk.download('punkt') | |
| docs = None | |
| # Definimos los modelos: | |
| # Traducción | |
| mname = "Helsinki-NLP/opus-mt-es-en" | |
| tokenizer_es_en = MarianTokenizer.from_pretrained(mname) | |
| model_es_en = MarianMTModel.from_pretrained(mname) | |
| model_es_en.to(device) | |
| mname = "Helsinki-NLP/opus-mt-en-es" | |
| tokenizer_en_es = MarianTokenizer.from_pretrained(mname) | |
| model_en_es = MarianMTModel.from_pretrained(mname) | |
| model_en_es.to(device) | |
| lt = LineTokenizer() | |
| # Responder preguntas | |
| tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1") | |
| model = AutoModel.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1").to(device).eval() | |
| tokenizer_ans = AutoTokenizer.from_pretrained("deepset/roberta-large-squad2") | |
| model_ans = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-large-squad2").to(device).eval() | |
| if device == 'cuda:0': | |
| pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans,device = 0) | |
| else: | |
| pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans) | |
| def validate_dataset(dataset): | |
| global docs | |
| docs = None # clear it out if dataset is modified | |
| docs_ready = dataset.iloc[-1, 0] != "" | |
| if docs_ready: | |
| return "✨Listo✨" | |
| else: | |
| return "⚠️Esperando documentos..." | |
| def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ): | |
| parrafos_traducidos = [] | |
| for parrafo in parrafos: | |
| frases = sent_tokenize(parrafo) | |
| batches = math.ceil(len(frases) / tam_bloque) | |
| traducido = [] | |
| for i in range(batches): | |
| bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque] | |
| model_inputs = tokenizer(bloque_enviado, return_tensors="pt", | |
| padding=True, truncation=True, | |
| max_length=500).to(device) | |
| with torch.no_grad(): | |
| bloque_traducido = model.generate(**model_inputs) | |
| traducido += bloque_traducido | |
| traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido] | |
| parrafos_traducidos += [" ".join(traducido)] | |
| return parrafos_traducidos | |
| def traducir_es_en(texto): | |
| parrafos = lt.tokenize(texto) | |
| par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en) | |
| return "\n".join(par_tra) | |
| def traducir_en_es(texto): | |
| parrafos = lt.tokenize(texto) | |
| par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es) | |
| return "\n".join(par_tra) | |
| def request_pathname(files): | |
| if files is None: | |
| return [[]] | |
| return [[file.name, file.name.split('/')[-1]] for file in files] | |
| def cls_pooling(model_output): | |
| return model_output.last_hidden_state[:,0] | |
| def encode_query(query): | |
| encoded_input = tokenizer(query, truncation=True, return_tensors='pt').to(device) | |
| with torch.no_grad(): | |
| model_output = model(**encoded_input, return_dict=True) | |
| embeddings = cls_pooling(model_output) | |
| return embeddings.cpu() | |
| def encode_docs(docs,maxlen = 64, stride = 32): | |
| encoded_input = [] | |
| embeddings = [] | |
| spans = [] | |
| file_names = [] | |
| name, text = docs | |
| text = text.split(" ") | |
| if len(text) < maxlen: | |
| text = " ".join(text) | |
| encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device)) | |
| spans.append(temp_text) | |
| file_names.append(name) | |
| else: | |
| num_iters = int(len(text)/maxlen)+1 | |
| for i in range(num_iters): | |
| if i == 0: | |
| temp_text = " ".join(text[i*maxlen:(i+1)*maxlen+stride]) | |
| else: | |
| temp_text = " ".join(text[(i-1)*maxlen:(i)*maxlen][-stride:] + text[i*maxlen:(i+1)*maxlen]) | |
| encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device)) | |
| spans.append(temp_text) | |
| file_names.append(name) | |
| with torch.no_grad(): | |
| for encoded in tqdm(encoded_input): | |
| model_output = model(**encoded, return_dict=True) | |
| embeddings.append(cls_pooling(model_output)) | |
| embeddings = np.float32(torch.stack(embeddings).transpose(0, 1).cpu()) | |
| np.save("emb_{}.npy".format(name),dict(zip(list(range(len(embeddings))),embeddings))) | |
| np.save("spans_{}.npy".format(name),dict(zip(list(range(len(spans))),spans))) | |
| np.save("file_{}.npy".format(name),dict(zip(list(range(len(file_names))),file_names))) | |
| return embeddings, spans, file_names | |
| def predict(query,data): | |
| query = traducir_es_en(query) | |
| name_to_save = data.name.split("/")[-1].split(".")[0][:-8] | |
| k=2 | |
| st = str([query,name_to_save]) | |
| st_hashed = str(hashlib.sha256(st.encode()).hexdigest()) #just to speed up examples load | |
| hist = st + " " + st_hashed | |
| now = datetime.now() | |
| current_time = now.strftime("%H:%M:%S") | |
| try: #if the same question was already asked for this document, upload question and answer | |
| df = pd.read_csv("{}.csv".format(hash(st))) | |
| list_outputs = [] | |
| for i in range(k): | |
| temp = [df.iloc[n] for n in range(k)][i] | |
| tupla = (traducir_en_es(temp.Respuesta), | |
| traducir_en_es(temp.Contexto), | |
| traducir_en_es(temp.Probabilidades)) | |
| # text = '' | |
| # text += 'Probabilidades: '+ temp.Probabilidades + '\n\n' | |
| # text += 'Respuesta: ' +temp.Respuesta + '\n\n' | |
| # text += 'Contexto: '+temp.Contexto + '\n\n' | |
| list_outputs.append(tupla) | |
| return list_outputs[0] | |
| except Exception as e: | |
| print(e) | |
| print(st) | |
| if name_to_save+".txt" in os.listdir(): #if the document was already used, load its embeddings | |
| doc_emb = np.load('emb_{}.npy'.format(name_to_save),allow_pickle='TRUE').item() | |
| doc_text = np.load('spans_{}.npy'.format(name_to_save),allow_pickle='TRUE').item() | |
| file_names_dicto = np.load('file_{}.npy'.format(name_to_save),allow_pickle='TRUE').item() | |
| doc_emb = np.array(list(doc_emb.values())).reshape(-1,768) | |
| doc_text = list(doc_text.values()) | |
| file_names = list(file_names_dicto.values()) | |
| else: | |
| text = textract.process("{}".format(data.name)).decode('utf8') | |
| text = text.replace("\r", " ") | |
| text = text.replace("\n", " ") | |
| text = text.replace(" . "," ") | |
| text = traducir_es_en(text) | |
| doc_emb, doc_text, file_names = encode_docs((name_to_save,text),maxlen = 64, stride = 32) | |
| doc_emb = doc_emb.reshape(-1, 768) | |
| with open("{}.txt".format(name_to_save),"w",encoding="utf-8") as f: | |
| f.write(text) | |
| #once embeddings are calculated, run MIPS | |
| start = time.time() | |
| query_emb = encode_query(query) | |
| scores = np.matmul(query_emb, doc_emb.transpose(1,0))[0].tolist() | |
| doc_score_pairs = list(zip(doc_text, scores, file_names)) | |
| doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True) | |
| probs_sum = 0 | |
| probs = softmax(sorted(scores,reverse = True)[:k]) | |
| table = {"Contexto":[],"Respuesta":[],"Probabilidades":[]} | |
| #get answers for each pair of question (from user) and top best passages | |
| for i, (passage, _, names) in enumerate(doc_score_pairs[:k]): | |
| passage = passage.replace("\n","") | |
| #passage = passage.replace(" . "," ") | |
| if probs[i] > 0.1 or (i < 3 and probs[i] > 0.05): #generate answers for more likely passages but no less than 2 | |
| QA = {'question':query,'context':passage} | |
| ans = pipe(QA) | |
| probabilities = "P(a|p): {}, P(a|p,q): {}, P(p|q): {}".format(round(ans["score"],5), | |
| round(ans["score"]*probs[i],5), | |
| round(probs[i],5)) | |
| table["Contexto"].append(passage) | |
| table["Respuesta"].append(str(ans["answer"]).upper()) | |
| table["Probabilidades"].append(probabilities) | |
| else: | |
| table["Contexto"].append(passage) | |
| table["Respuesta"].append("no_answer_calculated") | |
| table["Probabilidades"].append("P(p|q): {}".format(round(probs[i],5))) | |
| #format answers for ~nice output and save it for future (if the same question is asked again using same pdf) | |
| df = pd.DataFrame(table) | |
| print(df) | |
| print("time: "+ str(time.time()-start)) | |
| with open("HISTORY.txt","a", encoding = "utf-8") as f: | |
| f.write(hist) | |
| f.write(" " + str(current_time)) | |
| f.write("\n") | |
| f.close() | |
| df.to_csv("{}.csv".format(hash(st)), index=False) | |
| list_outputs = [] | |
| for i in range(k): | |
| temp = [df.iloc[n] for n in range(k)][i] | |
| tupla = (traducir_en_es(temp.Respuesta), | |
| traducir_en_es(temp.Contexto), | |
| traducir_en_es(temp.Probabilidades)) | |
| # text = '' | |
| # text += 'Probabilidades: '+ temp.Probabilidades + '\n\n' | |
| # text += 'Respuesta: ' +temp.Respuesta + '\n\n' | |
| # text += 'Contexto: '+temp.Contexto + '\n\n' | |
| list_outputs.append(tupla) | |
| return list_outputs[0] | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # Document Question and Answer adaptado al castellano por Pablo Ascorbe. | |
| Este espacio ha sido clonado y adaptado de: https://huggingface.co/spaces/whitead/paper-qa | |
| La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad" | |
| y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano | |
| a inglés y luego volver a traducir en sentido contrario. | |
| ## Instrucciones: | |
| Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee. | |
| """) | |
| file = gr.File( | |
| label="Sus documentos subidos (PDF o txt)") | |
| # dataset = gr.Dataframe( | |
| # headers=["filepath", "citation string"], | |
| # datatype=["str", "str"], | |
| # col_count=(2, "fixed"), | |
| # interactive=True, | |
| # label="Documentos y citas" | |
| # ) | |
| # buildb = gr.Textbox("⚠️Esperando documentos...", | |
| # label="Estado", interactive=False, show_label=True) | |
| # dataset.change(validate_dataset, inputs=[ | |
| # dataset], outputs=[buildb]) | |
| # uploaded_files.change(request_pathname, inputs=[ | |
| # uploaded_files], outputs=[dataset]) | |
| query = gr.Textbox( | |
| placeholder="Introduzca su pregunta aquí...", label="Pregunta") | |
| ask = gr.Button("Preguntar") | |
| gr.Markdown("## Respuesta") | |
| answer = gr.Markdown(label="Respuesta") | |
| prob = gr.Markdown(label="Probabilidades") | |
| with gr.Accordion("Contexto", open=False): | |
| gr.Markdown( | |
| "### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:") | |
| context = gr.Markdown(label="Contexto") | |
| # ask.click(fn=do_ask, inputs=[query, buildb, | |
| # dataset], outputs=[answer, context]) | |
| ask.click(fn=predict, inputs=[query, file], | |
| outputs=[answer, context, prob]) | |
| examples = gr.Examples(examples=[["¿Cuándo suelen comenzar las adicciones?","Entrevista Miguel Ruiz.txt"]], | |
| inputs=[query, file]) | |
| demo.queue(concurrency_count=20) | |
| demo.launch(show_error=True) | |
| # iface = gr.Interface(fn =predict, | |
| # inputs = [gr.inputs.Textbox(default="What is Open-domain question answering?"), | |
| # gr.inputs.File(), | |
| # ], | |
| # outputs = [ | |
| # gr.outputs.Carousel(['text']), | |
| # ], | |
| # description=description, | |
| # title = title, | |
| # allow_flagging ="manual",flagging_options = ["correct","wrong"], | |
| # allow_screenshot=False) | |
| # iface.launch(enable_queue=True, show_error =True) | |
| # Definimos los modelos: | |
| # Traducción | |
| # mname = "Helsinki-NLP/opus-mt-es-en" | |
| # tokenizer_es_en = MarianTokenizer.from_pretrained(mname) | |
| # model_es_en = MarianMTModel.from_pretrained(mname) | |
| # model_es_en.to(device) | |
| # mname = "Helsinki-NLP/opus-mt-en-es" | |
| # tokenizer_en_es = MarianTokenizer.from_pretrained(mname) | |
| # model_en_es = MarianMTModel.from_pretrained(mname) | |
| # model_en_es.to(device) | |
| # lt = LineTokenizer() | |
| # Responder preguntas | |
| # question_answerer = pipeline("question-answering", model='distilbert-base-cased-distilled-squad') | |
| # def request_pathname(files): | |
| # if files is None: | |
| # return [[]] | |
| # return [[file.name, file.name.split('/')[-1]] for file in files] | |
| # def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ): | |
| # parrafos_traducidos = [] | |
| # for parrafo in parrafos: | |
| # frases = sent_tokenize(parrafo) | |
| # batches = math.ceil(len(frases) / tam_bloque) | |
| # traducido = [] | |
| # for i in range(batches): | |
| # bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque] | |
| # model_inputs = tokenizer(bloque_enviado, return_tensors="pt", | |
| # padding=True, truncation=True, | |
| # max_length=500).to(device) | |
| # with torch.no_grad(): | |
| # bloque_traducido = model.generate(**model_inputs) | |
| # traducido += bloque_traducido | |
| # traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido] | |
| # parrafos_traducidos += [" ".join(traducido)] | |
| # return parrafos_traducidos | |
| # def traducir_es_en(texto): | |
| # parrafos = lt.tokenize(texto) | |
| # par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en) | |
| # return "\n".join(par_tra) | |
| # def traducir_en_es(texto): | |
| # parrafos = lt.tokenize(texto) | |
| # par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es) | |
| # return "\n".join(par_tra) | |
| # def validate_dataset(dataset): | |
| # global docs | |
| # docs = None # clear it out if dataset is modified | |
| # docs_ready = dataset.iloc[-1, 0] != "" | |
| # if docs_ready: | |
| # return "✨Listo✨" | |
| # else: | |
| # return "⚠️Esperando documentos..." | |
| # def do_ask(question, button, dataset): | |
| # global docs | |
| # docs_ready = dataset.iloc[-1, 0] != "" | |
| # if button == "✨Listo✨" and docs_ready: | |
| # for _, row in dataset.iterrows(): | |
| # path = row['filepath'] | |
| # text = Path(f'{path}').read_text() | |
| # text_en = traducir_es_en(text) | |
| # QA_input = { | |
| # 'question': traducir_es_en(question), | |
| # 'context': text_en | |
| # } | |
| # return traducir_en_es(question_answerer(QA_input)['answer']) | |
| # else: | |
| # return "" | |
| # # def do_ask(question, button, dataset, progress=gr.Progress()): | |
| # # global docs | |
| # # docs_ready = dataset.iloc[-1, 0] != "" | |
| # # if button == "✨Listo✨" and docs_ready: | |
| # # if docs is None: # don't want to rebuild index if it's already built | |
| # # import paperqa | |
| # # docs = paperqa.Docs() | |
| # # # dataset is pandas dataframe | |
| # # for _, row in dataset.iterrows(): | |
| # # key = None | |
| # # if ',' not in row['citation string']: | |
| # # key = row['citation string'] | |
| # # docs.add(row['filepath'], row['citation string'], key=key) | |
| # # else: | |
| # # return "" | |
| # # progress(0, "Construyendo índices...") | |
| # # docs._build_faiss_index() | |
| # # progress(0.25, "Encolando...") | |
| # # result = docs.query(question) | |
| # # progress(1.0, "¡Hecho!") | |
| # # return result.formatted_answer, result.context | |
| # with gr.Blocks() as demo: | |
| # gr.Markdown(""" | |
| # # Document Question and Answer adaptado al castellano por Pablo Ascorbe. | |
| # Este espacio ha sido clonado y adaptado de: https://huggingface.co/spaces/whitead/paper-qa | |
| # La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad" | |
| # y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano | |
| # a inglés y luego volver a traducir en sentido contrario. | |
| # ## Instrucciones: | |
| # Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee. | |
| # """) | |
| # uploaded_files = gr.File( | |
| # label="Sus documentos subidos (PDF o txt)", file_count="multiple", ) | |
| # dataset = gr.Dataframe( | |
| # headers=["filepath", "citation string"], | |
| # datatype=["str", "str"], | |
| # col_count=(2, "fixed"), | |
| # interactive=True, | |
| # label="Documentos y citas" | |
| # ) | |
| # buildb = gr.Textbox("⚠️Esperando documentos...", | |
| # label="Estado", interactive=False, show_label=True) | |
| # dataset.change(validate_dataset, inputs=[ | |
| # dataset], outputs=[buildb]) | |
| # uploaded_files.change(request_pathname, inputs=[ | |
| # uploaded_files], outputs=[dataset]) | |
| # query = gr.Textbox( | |
| # placeholder="Introduzca su pregunta aquí...", label="Pregunta") | |
| # ask = gr.Button("Preguntar") | |
| # gr.Markdown("## Respuesta") | |
| # answer = gr.Markdown(label="Respuesta") | |
| # with gr.Accordion("Contexto", open=False): | |
| # gr.Markdown( | |
| # "### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:") | |
| # context = gr.Markdown(label="Contexto") | |
| # # ask.click(fn=do_ask, inputs=[query, buildb, | |
| # # dataset], outputs=[answer, context]) | |
| # ask.click(fn=do_ask, inputs=[query, buildb, | |
| # dataset], outputs=[answer]) | |
| # demo.queue(concurrency_count=20) | |
| # demo.launch(show_error=True) |