import nltk import os import json import math import re import gradio as gr from collections import defaultdict, Counter from nltk.tokenize import word_tokenize from nltk.stem import PorterStemmer, WordNetLemmatizer nltk.download("punkt") nltk.download("wordnet") nltk.download("punkt_tab") stop_words = {"a", "is", "the", "of", "all", "and", "to", "can", "be", "as", "once", "for", "at", "am", "are", "has", "have", "had", "up", "his", "her", "in", "on", "no", "we", "do"} with open("docs.json", "r", encoding="utf-8") as f: docs_ds = json.load(f) with open("queries.json", "r", encoding="utf-8") as f: queries_ds = json.load(f) documents = {int(doc["doc_id"]): doc["text"] for doc in docs_ds} queries = {int(q["query_id"]): q["text"] for q in queries_ds} inverted_index = defaultdict(set) positional_index = defaultdict(lambda: defaultdict(list)) tf_idf_vectors = defaultdict(dict) idf_scores = {} def process_documents(documents): stemmer = PorterStemmer() lemmatizer = WordNetLemmatizer() doc_freq = defaultdict(int) term_freqs = {} for doc_id, text in documents.items(): words = word_tokenize(text.lower()) filtered_words = [lemmatizer.lemmatize(w) for w in words if w.isalnum() and w not in stop_words] term_counts = Counter(filtered_words) term_freqs[doc_id] = term_counts for pos, word in enumerate(filtered_words): stemmed = stemmer.stem(word) inverted_index[stemmed].add(doc_id) positional_index[stemmed][doc_id].append(pos) for word in set(filtered_words): doc_freq[word] += 1 total_docs = len(documents) for word, df in doc_freq.items(): idf_scores[word] = math.log(total_docs / df) for doc_id, term_counts in term_freqs.items(): tf_idf_vectors[doc_id] = {word: count * idf_scores[word] for word, count in term_counts.items()} def execute_boolean_query(query, documents): query = query.lower() tokens = query.split() stemmer = PorterStemmer() operators = {'and', 'or', 'not'} term_stack = [] operator_stack = [] for token in tokens: if token in operators: operator_stack.append(token) else: stemmed_word = stemmer.stem(token) term_set = inverted_index.get(stemmed_word, set()) term_stack.append(term_set) while 'not' in operator_stack: idx = operator_stack.index('not') term_stack[idx] = set(documents.keys()) - term_stack[idx] operator_stack.pop(idx) while operator_stack: op = operator_stack.pop(0) left = term_stack.pop(0) right = term_stack.pop(0) if op == 'and': term_stack.insert(0, left & right) elif op == 'or': term_stack.insert(0, left | right) return sorted(term_stack[0]) if term_stack else [] def execute_proximity_query(query): match = re.match(r'(\w+)\s+(\w+)\s*/\s*(\d+)', query) if not match: return [] word1, word2, k = match.groups() k = int(k) stemmer = PorterStemmer() word1 = stemmer.stem(word1.lower()) word2 = stemmer.stem(word2.lower()) result_docs = set() if word1 in positional_index and word2 in positional_index: for doc_id in positional_index[word1]: if doc_id in positional_index[word2]: positions1 = positional_index[word1][doc_id] positions2 = positional_index[word2][doc_id] if any(0 < abs(p1 - p2) <= k for p1 in positions1 for p2 in positions2): result_docs.add(doc_id) return sorted(result_docs) def evaluate_cosine_similarity_score(vec1, vec2): common = set(vec1.keys()) & set(vec2.keys()) dot_product = sum(vec1[k] * vec2[k] for k in common) norm1 = math.sqrt(sum(v**2 for v in vec1.values())) norm2 = math.sqrt(sum(v**2 for v in vec2.values())) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def process_query(user_input_query): lemmatizer = WordNetLemmatizer() tokens = word_tokenize(user_input_query.lower()) filtered = [lemmatizer.lemmatize(w) for w in tokens if w.isalnum() and w not in stop_words] query_counts = Counter(filtered) return {w: query_counts[w] * idf_scores.get(w, 0) for w in query_counts} def execute_vsm_query(user_input_query, alpha=0.001): query_vector = process_query(user_input_query) scores = {} for doc_id, doc_vector in tf_idf_vectors.items(): sim = evaluate_cosine_similarity_score(query_vector, doc_vector) if sim >= alpha: scores[doc_id] = sim return sorted(scores, key=scores.get, reverse=True) process_documents(documents) def chatbot_fn(query, method): if not query: return "Query cannot be empty" if method == "Boolean": result = execute_boolean_query(query, documents) elif method == "Proximity": result = execute_proximity_query(query) elif method == "Vector Space Model": result = execute_vsm_query(query) return f"Result-set: {result}" iface = gr.Interface( fn=chatbot_fn, inputs=["text", gr.Radio(["Boolean", "Proximity", "Vector Space Model"], label="Method")], outputs="text", title="Information Retrieval Chatbot", ) iface.launch()