Source code for irspdf.ir_collection

import re
import os
import numpy as np
import pdfplumber
import snowballstemmer
from stop_words import get_stop_words
from collections import Counter, defaultdict


[docs]class IRCollection: """Builds a text IR collection from a set of pdf files. :param max_length: max number of char in a valid word :type max_length: int :param vocabulary: contains all the words in the collection :type vocabulary: collections.Counter :param inverted_index: inverted index of the collection :type inverted_index: dict :param doc_length: contains all the length of all the document :type doc_length: collections.Counter :param avg_doc_length: average length of documents in the collection :type avg_doc_length: float :param min_freq: min number of occurences for a word to be in the vocabulary :type min_freq: int :param idf: inverted document frequency of all words :type idf: dict :param stops: set of stopwords to be deleterd from the vocabulary :type stops: set :param num_docs: total number of documents in the collection :type num_docs: int """ def __init__(self, path=None): """Initialise max_length, min_freq and stops, if path is set to a value, builds the collection from the pdf files in the folder path :param path: folder containing all pdf files used to build the collection :type path: str """ self.max_length = 30 self.min_freq = 5 self.stops = get_stop_words('en') self.stemmer = snowballstemmer.stemmer('english') if path: self.build_collection(path)
[docs] def build_collection(self, path): """Builds the collection from the pdf files in the folder path :param path: folder containing all pdf files used to build the collection :type path: str """ self.vocabulary = Counter() self.read_all_pdfs(path) self.remove_low_freq() self.index_words() self.compute_idfs() self.compute_docs_lengths()
[docs] def read_all_pdfs(self, path): """Extracts the text from all the pdf files in path :param path: folder containing the pdf files :type path: str """ self.inverted_index = defaultdict(lambda: Counter()) self.num_docs = 0 for file in os.listdir(path): if '.pdf' in file: self.num_docs += 1 self.read_pdf(os.path.join(path, file), file.split('.')[0])
[docs] def read_pdf(self, path, docname): """Reads a single pdf file, builds a document from it and updates the vocabulary and the inverted index :param path: pdf file location :type path: str :param docname: name that will be given to the document :type docname: str """ print(f"Reading {path} as {docname}") with pdfplumber.open(path) as pdf: for i, page in enumerate(pdf.pages): text = re.sub(r"[^a-zA-Z0-9,\-/]", " ", page.extract_text()) for word in self.stemmer.stemWords(text.split()): word = word.lower() if (len(word) < self.max_length and len(word) > 1 and word not in self.stops): self.inverted_index[word][docname] += 1 self.vocabulary[word] += 1
[docs] def remove_low_freq(self): """Deletes from the vocabulary the words that occur less than min_freq times """ for key in list(self.vocabulary.keys()): if self.vocabulary[key] < self.min_freq: del self.vocabulary[key] del self.inverted_index[key]
[docs] def index_words(self): """Exchanges words in the vocabulary and the inverted index with int """ for i, (key, value) in enumerate(self.vocabulary.most_common()): self.vocabulary[key] = i temp_inverted_index = dict() for key, value in self.vocabulary.items(): temp_inverted_index[value] = self.inverted_index[key].most_common() del self.inverted_index[key] self.inverted_index = temp_inverted_index
[docs] def get_idf(self, word): """Computes the smoothed idf of a single word :param word: index of a word in the inverted_index :type word: int :return: idf of word :rtype: float """ return 1 + np.log((self.num_docs)/(1+len(self.inverted_index[word])))
[docs] def compute_idfs(self): """Compute the idf of all words in the vocabulary """ self.idf = {word: self.get_idf(word) for word in self.inverted_index}
[docs] def compute_docs_lengths(self): """Compute the length of all documents using the inverted index """ self.doc_length = Counter() for words, posting_lists in self.inverted_index.items(): for doc, freq in posting_lists: self.doc_length[doc] += freq self.avg_doc_length = sum([val for val in self.doc_length.values()]) self.avg_doc_length /= len(self.doc_length)
[docs] def score_BM25(self, word_id, doc, freq, k1, b): """Computes the BM25 score of a term in a document :param word_id: id of the word in the inverted index :type word_id: int :param doc: document name :type doc: str :param freq: frequency of the word in the document :type freq: int :param k1: BM25 parameter must be a positive real value :type k1: float :param b: BM25 parameter must be in [0,1] :type b: float :return: BM25 score of the word in the document :rtype: float """ score = self.idf[word_id]*(k1+1)*freq score /= freq+k1*((1-b)+b*self.doc_length[doc]/self.avg_doc_length) return score
[docs] def BM25(self, query, k1=1.2, b=0.75, k=1000, display=True): """Compute the BM25 score of all the documents with rtespect to a query :param query: the query as a string :param k1: BM25 parameter must be a positive real value :param b: BM25 parameter must be in [0,1] :param k: max number of documents to return :param display: if set to true will print top-k document with their score :return: A counter of the documents and their BM25 score :rtype: collections.Counter """ query = re.sub(r"[^a-zA-Z0-9,\-/]", " ", query) results = Counter() for word in self.stemmer.stemWords(query.split()): word = word.lower() if word in self.vocabulary: word_id = self.vocabulary[word] for doc, freq in self.inverted_index[word_id]: results[doc] += self.score_BM25(word_id, doc, freq, k1, b) if display: for doc, score in results.most_common(k): print(f'{doc} : {score}') if not results: print(f'Not document found for the query : {query}') return results
[docs] def update(self, collection): """Updates the IRCollection with documents from a new IRCollection WARNING: The documents in the new IRCollection must be different from the documents in the original IRCollection :param collection: IRCollection object that contains the documents to update to the collection with :type collection: irspdf.IRCollection """ nb_words = len(self.vocabulary) for word, index in collection.vocabulary.items(): if word not in self.vocabulary: self.vocabulary[word] = nb_words self.inverted_index[nb_words] =\ collection.inverted_index[index] nb_words += 1 else: oindex = self.vocabulary[word] self.inverted_index[oindex] += collection.inverted_index[index] for doc, length in collection.doc_length.items(): self.doc_length[doc] = length self.avg_doc_length = sum([val for val in self.doc_length.values()]) self.avg_doc_length /= len(self.doc_length) self.compute_idfs()