| import nltk | |
| import os | |
| import json | |
| import math | |
| import re | |
| import gradio as gr | |
| from collections import defaultdict, Counter | |
| from nltk.tokenize import word_tokenize | |
| from nltk.stem import PorterStemmer, WordNetLemmatizer | |
| nltk.download("punkt") | |
| nltk.download("wordnet") | |
| nltk.download("punkt_tab") | |
| stop_words = {"a", "is", "the", "of", "all", "and", "to", "can", "be", "as", "once", "for", "at", "am", "are", "has", "have", "had", "up", "his", "her", "in", "on", "no", "we", "do"} | |
| with open("docs.json", "r", encoding="utf-8") as f: | |
| docs_ds = json.load(f) | |
| with open("queries.json", "r", encoding="utf-8") as f: | |
| queries_ds = json.load(f) | |
| documents = {int(doc["doc_id"]): doc["text"] for doc in docs_ds} | |
| queries = {int(q["query_id"]): q["text"] for q in queries_ds} | |
| inverted_index = defaultdict(set) | |
| positional_index = defaultdict(lambda: defaultdict(list)) | |
| tf_idf_vectors = defaultdict(dict) | |
| idf_scores = {} | |
| def process_documents(documents): | |
| stemmer = PorterStemmer() | |
| lemmatizer = WordNetLemmatizer() | |
| doc_freq = defaultdict(int) | |
| term_freqs = {} | |
| for doc_id, text in documents.items(): | |
| words = word_tokenize(text.lower()) | |
| filtered_words = [lemmatizer.lemmatize(w) for w in words if w.isalnum() and w not in stop_words] | |
| term_counts = Counter(filtered_words) | |
| term_freqs[doc_id] = term_counts | |
| for pos, word in enumerate(filtered_words): | |
| stemmed = stemmer.stem(word) | |
| inverted_index[stemmed].add(doc_id) | |
| positional_index[stemmed][doc_id].append(pos) | |
| for word in set(filtered_words): | |
| doc_freq[word] += 1 | |
| total_docs = len(documents) | |
| for word, df in doc_freq.items(): | |
| idf_scores[word] = math.log(total_docs / df) | |
| for doc_id, term_counts in term_freqs.items(): | |
| tf_idf_vectors[doc_id] = {word: count * idf_scores[word] for word, count in term_counts.items()} | |
| def execute_boolean_query(query, documents): | |
| query = query.lower() | |
| tokens = query.split() | |
| stemmer = PorterStemmer() | |
| operators = {'and', 'or', 'not'} | |
| term_stack = [] | |
| operator_stack = [] | |
| for token in tokens: | |
| if token in operators: | |
| operator_stack.append(token) | |
| else: | |
| stemmed_word = stemmer.stem(token) | |
| term_set = inverted_index.get(stemmed_word, set()) | |
| term_stack.append(term_set) | |
| while 'not' in operator_stack: | |
| idx = operator_stack.index('not') | |
| term_stack[idx] = set(documents.keys()) - term_stack[idx] | |
| operator_stack.pop(idx) | |
| while operator_stack: | |
| op = operator_stack.pop(0) | |
| left = term_stack.pop(0) | |
| right = term_stack.pop(0) | |
| if op == 'and': | |
| term_stack.insert(0, left & right) | |
| elif op == 'or': | |
| term_stack.insert(0, left | right) | |
| return sorted(term_stack[0]) if term_stack else [] | |
| def execute_proximity_query(query): | |
| match = re.match(r'(\w+)\s+(\w+)\s*/\s*(\d+)', query) | |
| if not match: | |
| return [] | |
| word1, word2, k = match.groups() | |
| k = int(k) | |
| stemmer = PorterStemmer() | |
| word1 = stemmer.stem(word1.lower()) | |
| word2 = stemmer.stem(word2.lower()) | |
| result_docs = set() | |
| if word1 in positional_index and word2 in positional_index: | |
| for doc_id in positional_index[word1]: | |
| if doc_id in positional_index[word2]: | |
| positions1 = positional_index[word1][doc_id] | |
| positions2 = positional_index[word2][doc_id] | |
| if any(0 < abs(p1 - p2) <= k for p1 in positions1 for p2 in positions2): | |
| result_docs.add(doc_id) | |
| return sorted(result_docs) | |
| def evaluate_cosine_similarity_score(vec1, vec2): | |
| common = set(vec1.keys()) & set(vec2.keys()) | |
| dot_product = sum(vec1[k] * vec2[k] for k in common) | |
| norm1 = math.sqrt(sum(v**2 for v in vec1.values())) | |
| norm2 = math.sqrt(sum(v**2 for v in vec2.values())) | |
| if norm1 == 0 or norm2 == 0: | |
| return 0.0 | |
| return dot_product / (norm1 * norm2) | |
| def process_query(user_input_query): | |
| lemmatizer = WordNetLemmatizer() | |
| tokens = word_tokenize(user_input_query.lower()) | |
| filtered = [lemmatizer.lemmatize(w) for w in tokens if w.isalnum() and w not in stop_words] | |
| query_counts = Counter(filtered) | |
| return {w: query_counts[w] * idf_scores.get(w, 0) for w in query_counts} | |
| def execute_vsm_query(user_input_query, alpha=0.001): | |
| query_vector = process_query(user_input_query) | |
| scores = {} | |
| for doc_id, doc_vector in tf_idf_vectors.items(): | |
| sim = evaluate_cosine_similarity_score(query_vector, doc_vector) | |
| if sim >= alpha: | |
| scores[doc_id] = sim | |
| return sorted(scores, key=scores.get, reverse=True) | |
| process_documents(documents) | |
| def chatbot_fn(query, method): | |
| if not query: | |
| return "Query cannot be empty" | |
| if method == "Boolean": | |
| result = execute_boolean_query(query, documents) | |
| elif method == "Proximity": | |
| result = execute_proximity_query(query) | |
| elif method == "Vector Space Model": | |
| result = execute_vsm_query(query) | |
| return f"Result-set: {result}" | |
| iface = gr.Interface( | |
| fn=chatbot_fn, | |
| inputs=["text", gr.Radio(["Boolean", "Proximity", "Vector Space Model"], label="Method")], | |
| outputs="text", | |
| title="Information Retrieval Chatbot", | |
| ) | |
| iface.launch() |