# Natural Language Toolkit: Concordance Application # # Copyright (C) 2001-2012 NLTK Project # Author: Sumukh Ghodke # URL: # For license information, see LICENSE.TXT import re import threading import tkFont from Tkinter import (Tk, Button, END, Entry, Frame, IntVar, LEFT, Label, Menu, OptionMenu, SUNKEN, Scrollbar, StringVar, Text) from nltk.corpus import (cess_cat, brown, nps_chat, treebank, sinica_treebank, alpino, indian, floresta, mac_morpho, cess_esp) from nltk.util import in_idle from nltk.draw.util import ShowText WORD_OR_TAG = '[^/ ]+' BOUNDARY = r'\b' CORPUS_LOADED_EVENT = '<>' SEARCH_TERMINATED_EVENT = '<>' SEARCH_ERROR_EVENT = '<>' ERROR_LOADING_CORPUS_EVENT = '<>' # NB All corpora must be specified in a lambda expression so as not to be # loaded when the module is imported. _DEFAULT = 'English: Brown Corpus (Humor, simplified)' _CORPORA = { 'Catalan: CESS-CAT Corpus (simplified)': lambda: cess_cat.tagged_sents(simplify_tags=True), 'English: Brown Corpus': lambda: brown.tagged_sents(), 'English: Brown Corpus (simplified)': lambda: brown.tagged_sents(simplify_tags=True), 'English: Brown Corpus (Press, simplified)': lambda: brown.tagged_sents(categories=['news', 'editorial', 'reviews'], simplify_tags=True), 'English: Brown Corpus (Religion, simplified)': lambda: brown.tagged_sents(categories='religion', simplify_tags=True), 'English: Brown Corpus (Learned, simplified)': lambda: brown.tagged_sents(categories='learned', simplify_tags=True), 'English: Brown Corpus (Science Fiction, simplified)': lambda: brown.tagged_sents(categories='science_fiction', simplify_tags=True), 'English: Brown Corpus (Romance, simplified)': lambda: brown.tagged_sents(categories='romance', simplify_tags=True), 'English: Brown Corpus (Humor, simplified)': lambda: brown.tagged_sents(categories='humor', simplify_tags=True), 'English: NPS Chat Corpus': lambda: nps_chat.tagged_posts(), 'English: NPS Chat Corpus (simplified)': lambda: nps_chat.tagged_posts(simplify_tags=True), 'English: Wall Street Journal Corpus': lambda: treebank.tagged_sents(), 'English: Wall Street Journal Corpus (simplified)': lambda: treebank.tagged_sents(simplify_tags=True), 'Chinese: Sinica Corpus': lambda: sinica_treebank.tagged_sents(), 'Chinese: Sinica Corpus (simplified)': lambda: sinica_treebank.tagged_sents(simplify_tags=True), 'Dutch: Alpino Corpus': lambda: alpino.tagged_sents(), 'Dutch: Alpino Corpus (simplified)': lambda: alpino.tagged_sents(simplify_tags=True), 'Hindi: Indian Languages Corpus': lambda: indian.tagged_sents(files='hindi.pos'), 'Hindi: Indian Languages Corpus (simplified)': lambda: indian.tagged_sents(files='hindi.pos', simplify_tags=True), 'Portuguese: Floresta Corpus (Portugal)': lambda: floresta.tagged_sents(), 'Portuguese: Floresta Corpus (Portugal, simplified)': lambda: floresta.tagged_sents(simplify_tags=True), 'Portuguese: MAC-MORPHO Corpus (Brazil)': lambda: mac_morpho.tagged_sents(), 'Portuguese: MAC-MORPHO Corpus (Brazil, simplified)': lambda: mac_morpho.tagged_sents(simplify_tags=True), 'Spanish: CESS-ESP Corpus (simplified)': lambda: cess_esp.tagged_sents(simplify_tags=True), } class ConcordanceSearchView(object): _BACKGROUND_COLOUR='#FFF' #white #Colour of highlighted results _HIGHLIGHT_WORD_COLOUR='#F00' #red _HIGHLIGHT_WORD_TAG='HL_WRD_TAG' _HIGHLIGHT_LABEL_COLOUR='#C0C0C0' # dark grey _HIGHLIGHT_LABEL_TAG='HL_LBL_TAG' #Percentage of text left of the scrollbar position _FRACTION_LEFT_TEXT=0.30 def __init__(self): self.model = ConcordanceSearchModel() self.model.add_listener(self) self.top = Tk() self._init_top(self.top) self._init_menubar() self._init_widgets(self.top) self._bind_event_handlers() self.load_corpus(self.model.DEFAULT_CORPUS) def _init_top(self, top): top.geometry('950x680+50+50') top.title('NLTK Concordance Search') top.bind('', self.destroy) top.minsize(950,680) def _init_widgets(self, parent): self.main_frame = Frame(parent, dict(background=self._BACKGROUND_COLOUR, padx=1, pady=1, border=1)) self._init_corpus_select(self.main_frame) self._init_query_box(self.main_frame) self._init_results_box(self.main_frame) self._init_paging(self.main_frame) self._init_status(self.main_frame) self.main_frame.pack(fill='both', expand=True) def _init_menubar(self): self._result_size = IntVar(self.top) self._cntx_bf_len = IntVar(self.top) self._cntx_af_len = IntVar(self.top) menubar = Menu(self.top) filemenu = Menu(menubar, tearoff=0, borderwidth=0) filemenu.add_command(label='Exit', underline=1, command=self.destroy, accelerator='Ctrl-q') menubar.add_cascade(label='File', underline=0, menu=filemenu) editmenu = Menu(menubar, tearoff=0) rescntmenu = Menu(editmenu, tearoff=0) rescntmenu.add_radiobutton(label='20', variable=self._result_size, underline=0, value=20, command=self.set_result_size) rescntmenu.add_radiobutton(label='50', variable=self._result_size, underline=0, value=50, command=self.set_result_size) rescntmenu.add_radiobutton(label='100', variable=self._result_size, underline=0, value=100, command=self.set_result_size) rescntmenu.invoke(1) editmenu.add_cascade(label='Result Count', underline=0, menu=rescntmenu) cntxmenu = Menu(editmenu, tearoff=0) cntxbfmenu = Menu(cntxmenu, tearoff=0) cntxbfmenu.add_radiobutton(label='60 characters', variable=self._cntx_bf_len, underline=0, value=60, command=self.set_cntx_bf_len) cntxbfmenu.add_radiobutton(label='80 characters', variable=self._cntx_bf_len, underline=0, value=80, command=self.set_cntx_bf_len) cntxbfmenu.add_radiobutton(label='100 characters', variable=self._cntx_bf_len, underline=0, value=100, command=self.set_cntx_bf_len) cntxbfmenu.invoke(1) cntxmenu.add_cascade(label='Before', underline=0, menu=cntxbfmenu) cntxafmenu = Menu(cntxmenu, tearoff=0) cntxafmenu.add_radiobutton(label='70 characters', variable=self._cntx_af_len, underline=0, value=70, command=self.set_cntx_af_len) cntxafmenu.add_radiobutton(label='90 characters', variable=self._cntx_af_len, underline=0, value=90, command=self.set_cntx_af_len) cntxafmenu.add_radiobutton(label='110 characters', variable=self._cntx_af_len, underline=0, value=110, command=self.set_cntx_af_len) cntxafmenu.invoke(1) cntxmenu.add_cascade(label='After', underline=0, menu=cntxafmenu) editmenu.add_cascade(label='Context', underline=0, menu=cntxmenu) menubar.add_cascade(label='Edit', underline=0, menu=editmenu) self.top.config(menu=menubar) def set_result_size(self, **kwargs): self.model.result_count = self._result_size.get() def set_cntx_af_len(self, **kwargs): self._char_after = self._cntx_af_len.get() def set_cntx_bf_len(self, **kwargs): self._char_before = self._cntx_bf_len.get() def _init_corpus_select(self, parent): innerframe = Frame(parent, background=self._BACKGROUND_COLOUR) self.var = StringVar(innerframe) self.var.set(self.model.DEFAULT_CORPUS) Label(innerframe, justify=LEFT, text=' Corpus: ', background=self._BACKGROUND_COLOUR, padx = 2, pady = 1, border = 0).pack(side='left') other_corpora = self.model.CORPORA.keys().remove(self.model.DEFAULT_CORPUS) om = OptionMenu(innerframe, self.var, self.model.DEFAULT_CORPUS, command=self.corpus_selected, *self.model.non_default_corpora()) om['borderwidth'] = 0 om['highlightthickness'] = 1 om.pack(side='left') innerframe.pack(side='top', fill='x', anchor='n') def _init_status(self, parent): self.status = Label(parent, justify=LEFT, relief=SUNKEN, background=self._BACKGROUND_COLOUR, border=0, padx = 1, pady = 0) self.status.pack(side='top', anchor='sw') def _init_query_box(self, parent): innerframe = Frame(parent, background=self._BACKGROUND_COLOUR) another = Frame(innerframe, background=self._BACKGROUND_COLOUR) self.query_box = Entry(another, width=60) self.query_box.pack(side='left', fill='x', pady=25, anchor='center') self.search_button = Button(another, text='Search', command=self.search, borderwidth=1, highlightthickness=1) self.search_button.pack(side='left', fill='x', pady=25, anchor='center') self.query_box.bind('', self.search_enter_keypress_handler) another.pack() innerframe.pack(side='top', fill='x', anchor='n') def search_enter_keypress_handler(self, *event): self.search() def _init_results_box(self, parent): innerframe = Frame(parent) i1 = Frame(innerframe) i2 = Frame(innerframe) vscrollbar = Scrollbar(i1, borderwidth=1) hscrollbar = Scrollbar(i2, borderwidth=1, orient='horiz') self.results_box = Text(i1, font=tkFont.Font(family='courier', size='16'), state='disabled', borderwidth=1, yscrollcommand=vscrollbar.set, xscrollcommand=hscrollbar.set, wrap='none', width='40', height = '20', exportselection=1) self.results_box.pack(side='left', fill='both', expand=True) self.results_box.tag_config(self._HIGHLIGHT_WORD_TAG, foreground=self._HIGHLIGHT_WORD_COLOUR) self.results_box.tag_config(self._HIGHLIGHT_LABEL_TAG, foreground=self._HIGHLIGHT_LABEL_COLOUR) vscrollbar.pack(side='left', fill='y', anchor='e') vscrollbar.config(command=self.results_box.yview) hscrollbar.pack(side='left', fill='x', expand=True, anchor='w') hscrollbar.config(command=self.results_box.xview) #there is no other way of avoiding the overlap of scrollbars while using pack layout manager!!! Label(i2, text=' ', background=self._BACKGROUND_COLOUR).pack(side='left', anchor='e') i1.pack(side='top', fill='both', expand=True, anchor='n') i2.pack(side='bottom', fill='x', anchor='s') innerframe.pack(side='top', fill='both', expand=True) def _init_paging(self, parent): innerframe = Frame(parent, background=self._BACKGROUND_COLOUR) self.prev = prev = Button(innerframe, text='Previous', command=self.previous, width='10', borderwidth=1, highlightthickness=1, state='disabled') prev.pack(side='left', anchor='center') self.next = next = Button(innerframe, text='Next', command=self.next, width='10', borderwidth=1, highlightthickness=1, state='disabled') next.pack(side='right', anchor='center') innerframe.pack(side='top', fill='y') self.current_page = 0 def previous(self): self.clear_results_box() self.freeze_editable() self.model.prev(self.current_page - 1) def next(self): self.clear_results_box() self.freeze_editable() self.model.next(self.current_page + 1) def about(self, *e): ABOUT = ("NLTK Concordance Search Demo\n") TITLE = 'About: NLTK Concordance Search Demo' try: from tkMessageBox import Message Message(message=ABOUT, title=TITLE, parent=self.main_frame).show() except: ShowText(self.top, TITLE, ABOUT) def _bind_event_handlers(self): self.top.bind(CORPUS_LOADED_EVENT, self.handle_corpus_loaded) self.top.bind(SEARCH_TERMINATED_EVENT, self.handle_search_terminated) self.top.bind(SEARCH_ERROR_EVENT, self.handle_search_error) self.top.bind(ERROR_LOADING_CORPUS_EVENT, self.handle_error_loading_corpus) def handle_error_loading_corpus(self, event): self.status['text'] = 'Error in loading ' + self.var.get() self.unfreeze_editable() self.clear_all() self.freeze_editable() def handle_corpus_loaded(self, event): self.status['text'] = self.var.get() + ' is loaded' self.unfreeze_editable() self.clear_all() self.query_box.focus_set() def handle_search_terminated(self, event): #todo: refactor the model such that it is less state sensitive results = self.model.get_results() self.write_results(results) self.status['text'] = '' if len(results) == 0: self.status['text'] = 'No results found for ' + self.model.query else: self.current_page = self.model.last_requested_page self.unfreeze_editable() self.results_box.xview_moveto(self._FRACTION_LEFT_TEXT) def handle_search_error(self, event): self.status['text'] = 'Error in query ' + self.model.query self.unfreeze_editable() def corpus_selected(self, *args): new_selection = self.var.get() self.load_corpus(new_selection) def load_corpus(self, selection): if self.model.selected_corpus != selection: self.status['text'] = 'Loading ' + selection + '...' self.freeze_editable() self.model.load_corpus(selection) def search(self): self.current_page = 0 self.clear_results_box() self.model.reset_results() query = self.query_box.get() if (len(query.strip()) == 0): return self.status['text'] = 'Searching for ' + query self.freeze_editable() self.model.search(query, self.current_page + 1, ) def write_results(self, results): self.results_box['state'] = 'normal' row = 1 for each in results: sent, pos1, pos2 = each[0].strip(), each[1], each[2] if len(sent) != 0: if (pos1 < self._char_before): sent, pos1, pos2 = self.pad(sent, pos1, pos2) sentence = sent[pos1-self._char_before:pos1+self._char_after] if not row == len(results): sentence += '\n' self.results_box.insert(str(row) + '.0', sentence) word_markers, label_markers = self.words_and_labels(sent, pos1, pos2) for marker in word_markers: self.results_box.tag_add(self._HIGHLIGHT_WORD_TAG, str(row) + '.' + str(marker[0]), str(row) + '.' + str(marker[1])) for marker in label_markers: self.results_box.tag_add(self._HIGHLIGHT_LABEL_TAG, str(row) + '.' + str(marker[0]), str(row) + '.' + str(marker[1])) row += 1 self.results_box['state'] = 'disabled' def words_and_labels(self, sentence, pos1, pos2): search_exp = sentence[pos1:pos2] words, labels = [], [] labeled_words = search_exp.split(' ') index = 0 for each in labeled_words: if each == '': index += 1 else: word, label = each.split('/') words.append((self._char_before + index, self._char_before + index + len(word))) index += len(word) + 1 labels.append((self._char_before + index, self._char_before + index + len(label))) index += len(label) index += 1 return words, labels def pad(self, sent, hstart, hend): if hstart >= self._char_before: return sent, hstart, hend d = self._char_before - hstart sent = ''.join([' '] * d) + sent return sent, hstart + d, hend + d def destroy(self, *e): if self.top is None: return self.top.destroy() self.top = None def clear_all(self): self.query_box.delete(0, END) self.model.reset_query() self.clear_results_box() def clear_results_box(self): self.results_box['state'] = 'normal' self.results_box.delete("1.0", END) self.results_box['state'] = 'disabled' def freeze_editable(self): self.query_box['state'] = 'disabled' self.search_button['state'] = 'disabled' self.prev['state'] = 'disabled' self.next['state'] = 'disabled' def unfreeze_editable(self): self.query_box['state'] = 'normal' self.search_button['state'] = 'normal' self.set_paging_button_states() def set_paging_button_states(self): if self.current_page == 0 or self.current_page == 1: self.prev['state'] = 'disabled' else: self.prev['state'] = 'normal' if self.model.has_more_pages(self.current_page): self.next['state'] = 'normal' else: self.next['state'] = 'disabled' def fire_event(self, event): #Firing an event so that rendering of widgets happen in the mainloop thread self.top.event_generate(event, when='tail') def mainloop(self, *args, **kwargs): if in_idle(): return self.top.mainloop(*args, **kwargs) class ConcordanceSearchModel(object): def __init__(self): self.listeners = [] self.CORPORA = _CORPORA self.DEFAULT_CORPUS = _DEFAULT self.selected_corpus = None self.reset_query() self.reset_results() self.result_count = None self.last_sent_searched = 0 def non_default_corpora(self): copy = [] copy.extend(self.CORPORA.keys()) copy.remove(self.DEFAULT_CORPUS) copy.sort() return copy def load_corpus(self, name): self.selected_corpus = name self.tagged_sents = [] runner_thread = self.LoadCorpus(name, self) runner_thread.start() def search(self, query, page): self.query = query self.last_requested_page = page self.SearchCorpus(self, page, self.result_count).start() def next(self, page): self.last_requested_page = page if len(self.results) < page: self.search(self.query, page) else: self.notify_listeners(SEARCH_TERMINATED_EVENT) def prev(self, page): self.last_requested_page = page self.notify_listeners(SEARCH_TERMINATED_EVENT) def add_listener(self, listener): self.listeners.append(listener) def notify_listeners(self, event): for each in self.listeners: each.fire_event(event) def reset_results(self): self.last_sent_searched = 0 self.results = [] self.last_page = None def reset_query(self): self.query = None def set_results(self, page, resultset): self.results.insert(page - 1, resultset) def get_results(self): return self.results[self.last_requested_page - 1] def has_more_pages(self, page): if self.results == [] or self.results[0] == []: return False if self.last_page is None: return True return page < self.last_page class LoadCorpus(threading.Thread): def __init__(self, name, model): threading.Thread.__init__(self) self.model, self.name = model, name def run(self): try: ts = self.model.CORPORA[self.name]() self.model.tagged_sents = [' '.join(w+'/'+t for (w,t) in sent) for sent in ts] self.model.notify_listeners(CORPUS_LOADED_EVENT) except Exception, e: print e self.model.notify_listeners(ERROR_LOADING_CORPUS_EVENT) class SearchCorpus(threading.Thread): def __init__(self, model, page, count): self.model, self.count, self.page = model, count, page threading.Thread.__init__(self) def run(self): q = self.processed_query() sent_pos, i, sent_count = [], 0, 0 for sent in self.model.tagged_sents[self.model.last_sent_searched:]: try: m = re.search(q, sent) except re.error: self.model.reset_results() self.model.notify_listeners(SEARCH_ERROR_EVENT) return if m: sent_pos.append((sent, m.start(), m.end())) i += 1 if i > self.count: self.model.last_sent_searched += sent_count - 1 break sent_count += 1 if (self.count >= len(sent_pos)): self.model.last_sent_searched += sent_count - 1 self.model.last_page = self.page self.model.set_results(self.page, sent_pos) else: self.model.set_results(self.page, sent_pos[:-1]) self.model.notify_listeners(SEARCH_TERMINATED_EVENT) def processed_query(self): new = [] for term in self.model.query.split(): term = re.sub(r'\.', r'[^/ ]', term) if re.match('[A-Z]+$', term): new.append(BOUNDARY + WORD_OR_TAG + '/' + term + BOUNDARY) elif '/' in term: new.append(BOUNDARY + term + BOUNDARY) else: new.append(BOUNDARY + term + '/' + WORD_OR_TAG + BOUNDARY) return ' '.join(new) def app(): d = ConcordanceSearchView() d.mainloop() if __name__ == '__main__': app() __all__ = ['app']