Source code for cluster.data.data_node_iob

from cluster.data.data_node import DataNode
from master.workflow.data.workflow_data_iob import WorkFlowDataIob
from common import utils
import os,h5py
from time import gmtime, strftime
from cluster.service.service_predict_w2v import PredictNetW2V
from common.utils import *
from cluster.common.neural_common_bilismcrf import BiLstmCommon

[docs]class DataNodeIob(DataNode, BiLstmCommon):
[docs] def run(self, conf_data): """ run on train time data node collect data from source, preprocess data and sotre it on NAS :param conf_data: :return: """ self._init_node_parm(conf_data['node_id']) if(self.data_src_type == 'local') : self.src_local_handler(conf_data) if (self.data_src_type == 'rdb'): raise Exception ("on development now") if (self.data_src_type == 's3'): raise Exception("on development now") if (self.data_src_type == 'hbase'): raise Exception("on development now")
[docs] def src_local_handler(self, conf_data): """ read data from local file system :param conf_data: :return: """ try: # init value vocab_words = None vocab_tags = None vocab_chars = None # get word embedding model parm = {"type": "model", "val_1": {}, "val_2": []} embed_model = PredictNetW2V().run(self.word_embed_model, parm) # read files from srouce folder (handle one by one) fp_list = utils.get_filepaths(self.data_src_path, file_type='iob') if (len(fp_list) == 0): return None netconf_node = self.get_linked_next_node_with_grp('netconf') if (len(netconf_node) > 0 ) : store_path = get_model_path(netconf_node[0].get_net_id(), netconf_node[0].get_net_ver(), netconf_node[0].get_net_node_id()) # create dict folder for ner if not exists netconf_path = ''.join([store_path, '/dict/']) if not os.path.exists(netconf_path): os.makedirs(netconf_path) vocab_words = self.load_vocab(''.join([netconf_path, 'words.txt'])) vocab_tags = self.load_vocab(''.join([netconf_path, 'tags.txt'])) else : return None for file_path in fp_list : # Data Generators dev = self.CoNLLDataset(file_path) train = self.CoNLLDataset(file_path) # get distinct vocab and chars vocab_words, vocab_tags = self.get_vocabs([train, dev], vocab=vocab_words, tags=vocab_tags) vocab = vocab_words & set(embed_model.wv.index2word) vocab.add(self.UNK) vocab_chars = self.get_char_vocab(train, chars=vocab_chars) # write dict and vecotors for train self.write_char_embedding(vocab_chars, ''.join([netconf_path, 'char.vec'])) self.write_vocab(vocab_chars, ''.join([netconf_path, 'chars.txt'])) self.write_vocab(vocab, ''.join([netconf_path, 'words.txt'])) self.write_vocab(vocab_tags, ''.join([netconf_path, 'tags.txt'])) self.export_trimmed_glove_vectors(vocab, embed_model, ''.join([netconf_path, 'words.vec'])) except Exception as e: raise Exception(e) finally : for file_path in fp_list: # move source file to store path str_buf = self._load_local_files(file_path) self._save_raw_file(str_buf)
def _save_raw_file(self,buffer_list): file_name = ''.join([strftime("%Y-%m-%d-%H:%M:%S", gmtime()) , '.iob']) output_path = os.path.join(self.data_store_path, file_name) with open(output_path, 'w+') as f: for line in buffer_list: f.write("%s " % line) f.flush() f.close() def _load_local_files(self, file_path): """ :return: """ with open(file_path, 'r') as myfile: os.remove(file_path) return myfile.readlines() def _init_node_parm(self, key): """ init parms by using master classes (handling params) :return: """ try : wf_conf = WorkFlowDataIob(key) self.data_sql_stmt = wf_conf.get_sql_stmt() self.data_src_path = wf_conf.get_source_path() self.data_src_type = wf_conf.get_src_type() self.data_store_path = wf_conf.get_step_store() self.data_server_type = wf_conf.get_src_server() self.data_parse_type = wf_conf.get_parse_type() self.data_preprocess_type = wf_conf.get_step_preprocess() self.preprocess_type = self.data_preprocess_type self.word_embed_model = wf_conf.get_word_embed_model() except Exception as e : raise Exception ("error on initialzing data_iob node : {0}".format(e)) def _set_progress_state(self): return None
[docs] def load_data(self, node_id = "", parm = 'all'): """ load train data :param node_id: :param parm: :return: """ try: file_path = utils.get_filepaths(self.data_store_path, 'iob') return file_path except Exception as e: raise Exception(e)