Source code for cluster.common.neural_common_bilismcrf

import numpy as np
import os,h5py
from hanja import hangul

[docs]class BiLstmCommon : """ common functions for bilstm crf """ UNK = "$UNK$" NUM = "$NUM$" NONE = "O"
[docs] def write_char_embedding(self, vocab, trimmed_filename): """ Writes a vocab to a file Args: vocab: iterable that yields word filename: path to vocab file Returns: write a word per line """ try: print("Writing vocab...") embeddings = np.zeros([len(vocab), 160]) if (type(vocab) == type(set())): vocab = list(vocab) for i, word in enumerate(vocab): word = word.lower() embeddings[vocab.index(word)] = np.array(self.get_onehot_vector(word))[0] np.savetxt(trimmed_filename, embeddings) print("- done. {} tokens".format(len(vocab))) except Exception as e: print("error on write_char_embedding : {0}".format(e))
[docs] def write_vocab(self, vocab, filename): """ Writes a vocab to a file Args: vocab: iterable that yields word filename: path to vocab file Returns: write a word per line """ print("Writing vocab...") with open(filename, "w+") as f: for i, word in enumerate(vocab): if i != len(vocab) - 1: f.write("{}\n".format(word)) else: f.write(word) print("- done. {} tokens".format(len(vocab)))
[docs] def get_vocabs(self, datasets, vocab=None, tags=None): """ Args: datasets: a list of dataset objects Return: a set of all the words in the dataset """ try: print("Building vocab...") if (vocab == None or tags == None): vocab_words = set() vocab_tags = set() else: vocab_words = set(vocab) vocab_tags = set(tags) for dataset in datasets: for words, tags in dataset: vocab_words.update(words) vocab_tags.update(tags) print("- done. {} tokens".format(len(vocab_words))) return vocab_words, vocab_tags except Exception as e: print("error on get_vacabs {0}".format(e))
[docs] def load_vocab(self, filename): """ Args: filename: file with a word per line Returns: d: dict[word] = index """ d = dict() if (os.path.exists(filename) == False): return d with open(filename) as f: for idx, word in enumerate(f): word = word.strip() d[word] = idx return d
[docs] def export_trimmed_glove_vectors(self, vocab, model, trimmed_filename): """ Saves glove vectors in numpy array Args: vocab: dictionary vocab[word] = index glove_filename: a path to a glove file trimmed_filename: a path where to store a matrix in npy dim: (int) dimension of embeddings UNK = "$UNK$" NUM = "$NUM$" NONE = "O" """ try: embeddings = np.zeros([len(vocab), model.vector_size]) for word in vocab: if (word != self.UNK): embeddings[list(vocab).index(word)] = model[word] np.savetxt(trimmed_filename, embeddings) except Exception as e: print("error on export_trimmed_glove_vectors : {0}".format(e))
[docs] def get_trimmed_glove_vectors(self, filename): """ Args: filename: path to the npz file Returns: matrix of embeddings (np array) """ with open(filename) as f: return np.loadtxt(f)
[docs] def get_char_vocab(self, dataset, chars=None): """ Args: dataset: a iterator yielding tuples (sentence, tags) Returns: a set of all the characters in the dataset """ if (chars == None): vocab_char = set() else: vocab_char = set(chars) for words, _ in dataset: for word in words: word = word.lower() vocab_char.update(word) return vocab_char
[docs] def get_processing_word(self, vocab_words=None, vocab_chars=None, lowercase=False, chars=False): """ Args: vocab: dict[word] = idx Returns: f("cat") = ([12, 4, 32], 12345) = (list of char ids, word id) """ def f(word): # 1. preprocess word if (lowercase): word = word.lower() # 0. get chars of words if vocab_chars is not None and chars == True: char_ids = [] for char in word: # ignore chars out of vocabulary if char in vocab_chars: char_ids += [vocab_chars[char]] # 2. get id of word if vocab_words is not None: if word in vocab_words: word = vocab_words[word] else: word = vocab_words[self.UNK] # 3. return tuple char ids, word id if vocab_chars is not None and chars == True: return char_ids, word else: return word return f
[docs] def minibatches(self, data, minibatch_size): """ Args: data: generator of (sentence, tags) tuples minibatch_size: (int) Returns: list of tuples """ x_batch, y_batch = [], [] for (x, y) in data: if len(x_batch) == minibatch_size: yield x_batch, y_batch x_batch, y_batch = [], [] if type(x[0]) == tuple: x = zip(*x) x_batch += [x] y_batch += [y] if len(x_batch) != 0: yield x_batch, y_batch
def _pad_sequences(self, sequences, pad_tok, max_length): """ Args: sequences: a generator of list or tuple pad_tok: the char to pad with Returns: a list of list where each sublist has same length """ sequence_padded, sequence_length = [], [] for seq in sequences: seq = list(seq) seq_ = seq[:max_length] + [pad_tok] * max(max_length - len(seq), 0) sequence_padded += [seq_] sequence_length += [min(len(seq), max_length)] return sequence_padded, sequence_length
[docs] def get_chunks(self, seq, tags): """ Args: seq: [4, 4, 0, 0, ...] sequence of labels tags: dict["O"] = 4 Returns: list of (chunk_type, chunk_start, chunk_end) Example: seq = [4, 5, 0, 3] tags = {"B-PER": 4, "I-PER": 5, "B-LOC": 3} result = [("PER", 0, 2), ("LOC", 3, 4)] """ try: default = tags[self.NONE] idx_to_tag = {idx: tag for tag, idx in iter(tags.items())} chunks = [] chunk_type, chunk_start = None, None for i, tok in enumerate(seq): # End of a chunk 1 if tok == default and chunk_type is not None: # Add a chunk. chunk = (chunk_type, chunk_start, i) chunks.append(chunk) chunk_type, chunk_start = None, None # End of a chunk + start of a chunk! elif tok != default: tok_chunk_type = self.get_chunk_type(tok, idx_to_tag) if chunk_type is None: chunk_type, chunk_start = tok_chunk_type, i # elif tok_chunk_type != chunk_type or tok[0] == "B": elif tok_chunk_type != chunk_type: chunk = (chunk_type, chunk_start, i) chunks.append(chunk) chunk_type, chunk_start = tok_chunk_type, i else: pass # end condition if chunk_type is not None: chunk = (chunk_type, chunk_start, len(seq)) chunks.append(chunk) return chunks except Exception as e: raise Exception(e)
[docs] def pad_sequences(self, sequences, pad_tok, nlevels=1): """ Args: sequences: a generator of list or tuple pad_tok: the char to pad with Returns: a list of list where each sublist has same length """ if nlevels == 1: max_length = max(map(lambda x: len(x), sequences)) sequence_padded, sequence_length = self._pad_sequences(sequences, pad_tok, max_length) elif nlevels == 2: max_length_word = max([max(map(lambda x: len(x), seq)) for seq in sequences]) sequence_padded, sequence_length = [], [] for seq in sequences: # all words are same length now sp, sl = self._pad_sequences(seq, pad_tok, max_length_word) sequence_padded += [sp] sequence_length += [sl] max_length_sentence = max(map(lambda x: len(x), sequences)) sequence_padded, _ = self._pad_sequences(sequence_padded, [pad_tok] * max_length_word, max_length_sentence) sequence_length, _ = self._pad_sequences(sequence_length, 0, max_length_sentence) return sequence_padded, sequence_length
[docs] def get_chunk_type(self, tok, idx_to_tag): tag_name = idx_to_tag[tok] return tag_name.split('-')[-1]
[docs] class CoNLLDataset(object): """ Class that iterates over CoNLL Dataset """ def __init__(self, filename, processing_word=None, processing_tag=None, max_iter=None, all_line=True): """ Args: filename: path to the file processing_words: (optional) function thsat takes a word as input processing_tags: (optional) function that takes a tag as input max_iter: (optional) max number of sentences to yield """ self.filename = filename self.processing_word = processing_word self.processing_tag = processing_tag self.max_iter = max_iter self.length = None self.all_line = all_line def __iter__(self): try: niter = 0 with open(self.filename) as f: words, tags = [], [] for line in f: line = line.strip() if (len(line) == 0 or line.startswith("-DOCSTART-")): if len(words) != 0: niter += 1 if self.max_iter is not None and niter > self.max_iter: break yield words, tags words, tags = [], [] else: if len(line.split(' ')) != 2 : continue if len(line.split(' ')) > 2 : temp = line.split(' ') line = ' '.join([temp[len(temp)-2],temp[len(temp)-1]]) word, tag = line.split(' ') if self.processing_word is not None: word = self.processing_word(word) if self.processing_tag is not None: tag = self.processing_tag(tag) words += [word] tags += [tag] except Exception as e: raise Exception(e) def __len__(self): """ Iterates once over the corpus to set and store length """ if self.length is None: self.length = 0 for _ in self: self.length += 1 return self.length