mojad121's picture
Update app.py
aeffade verified
import nltk
import os
import json
import math
import re
import gradio as gr
from collections import defaultdict, Counter
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
nltk.download("punkt")
nltk.download("wordnet")
nltk.download("punkt_tab")
stop_words = {"a", "is", "the", "of", "all", "and", "to", "can", "be", "as", "once", "for", "at", "am", "are", "has", "have", "had", "up", "his", "her", "in", "on", "no", "we", "do"}
with open("docs.json", "r", encoding="utf-8") as f:
docs_ds = json.load(f)
with open("queries.json", "r", encoding="utf-8") as f:
queries_ds = json.load(f)
documents = {int(doc["doc_id"]): doc["text"] for doc in docs_ds}
queries = {int(q["query_id"]): q["text"] for q in queries_ds}
inverted_index = defaultdict(set)
positional_index = defaultdict(lambda: defaultdict(list))
tf_idf_vectors = defaultdict(dict)
idf_scores = {}
def process_documents(documents):
stemmer = PorterStemmer()
lemmatizer = WordNetLemmatizer()
doc_freq = defaultdict(int)
term_freqs = {}
for doc_id, text in documents.items():
words = word_tokenize(text.lower())
filtered_words = [lemmatizer.lemmatize(w) for w in words if w.isalnum() and w not in stop_words]
term_counts = Counter(filtered_words)
term_freqs[doc_id] = term_counts
for pos, word in enumerate(filtered_words):
stemmed = stemmer.stem(word)
inverted_index[stemmed].add(doc_id)
positional_index[stemmed][doc_id].append(pos)
for word in set(filtered_words):
doc_freq[word] += 1
total_docs = len(documents)
for word, df in doc_freq.items():
idf_scores[word] = math.log(total_docs / df)
for doc_id, term_counts in term_freqs.items():
tf_idf_vectors[doc_id] = {word: count * idf_scores[word] for word, count in term_counts.items()}
def execute_boolean_query(query, documents):
query = query.lower()
tokens = query.split()
stemmer = PorterStemmer()
operators = {'and', 'or', 'not'}
term_stack = []
operator_stack = []
for token in tokens:
if token in operators:
operator_stack.append(token)
else:
stemmed_word = stemmer.stem(token)
term_set = inverted_index.get(stemmed_word, set())
term_stack.append(term_set)
while 'not' in operator_stack:
idx = operator_stack.index('not')
term_stack[idx] = set(documents.keys()) - term_stack[idx]
operator_stack.pop(idx)
while operator_stack:
op = operator_stack.pop(0)
left = term_stack.pop(0)
right = term_stack.pop(0)
if op == 'and':
term_stack.insert(0, left & right)
elif op == 'or':
term_stack.insert(0, left | right)
return sorted(term_stack[0]) if term_stack else []
def execute_proximity_query(query):
match = re.match(r'(\w+)\s+(\w+)\s*/\s*(\d+)', query)
if not match:
return []
word1, word2, k = match.groups()
k = int(k)
stemmer = PorterStemmer()
word1 = stemmer.stem(word1.lower())
word2 = stemmer.stem(word2.lower())
result_docs = set()
if word1 in positional_index and word2 in positional_index:
for doc_id in positional_index[word1]:
if doc_id in positional_index[word2]:
positions1 = positional_index[word1][doc_id]
positions2 = positional_index[word2][doc_id]
if any(0 < abs(p1 - p2) <= k for p1 in positions1 for p2 in positions2):
result_docs.add(doc_id)
return sorted(result_docs)
def evaluate_cosine_similarity_score(vec1, vec2):
common = set(vec1.keys()) & set(vec2.keys())
dot_product = sum(vec1[k] * vec2[k] for k in common)
norm1 = math.sqrt(sum(v**2 for v in vec1.values()))
norm2 = math.sqrt(sum(v**2 for v in vec2.values()))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def process_query(user_input_query):
lemmatizer = WordNetLemmatizer()
tokens = word_tokenize(user_input_query.lower())
filtered = [lemmatizer.lemmatize(w) for w in tokens if w.isalnum() and w not in stop_words]
query_counts = Counter(filtered)
return {w: query_counts[w] * idf_scores.get(w, 0) for w in query_counts}
def execute_vsm_query(user_input_query, alpha=0.001):
query_vector = process_query(user_input_query)
scores = {}
for doc_id, doc_vector in tf_idf_vectors.items():
sim = evaluate_cosine_similarity_score(query_vector, doc_vector)
if sim >= alpha:
scores[doc_id] = sim
return sorted(scores, key=scores.get, reverse=True)
process_documents(documents)
def chatbot_fn(query, method):
if not query:
return "Query cannot be empty"
if method == "Boolean":
result = execute_boolean_query(query, documents)
elif method == "Proximity":
result = execute_proximity_query(query)
elif method == "Vector Space Model":
result = execute_vsm_query(query)
return f"Result-set: {result}"
iface = gr.Interface(
fn=chatbot_fn,
inputs=["text", gr.Radio(["Boolean", "Proximity", "Vector Space Model"], label="Method")],
outputs="text",
title="Information Retrieval Chatbot",
)
iface.launch()