Source code for cluster.data.data_node_text

from cluster.data.data_node import DataNode
from master.workflow.data.workflow_data_text import WorkFlowDataText

from common import utils
import os,h5py
from time import gmtime, strftime
from shutil import copyfile
import numpy as np

[docs]class DataNodeText(DataNode):
[docs] def run(self, conf_data): """ :param conf_data: :return: """ self._init_node_parm(conf_data['node_id']) if(self.data_src_type == 'local') : self.src_local_handler(conf_data) if (self.data_src_type == 'rdb'): raise Exception ("on development now") if (self.data_src_type == 's3'): raise Exception("on development now") if (self.data_src_type == 'hbase'): raise Exception("on development now")
[docs] def src_local_handler(self, conf_data): """ :param conf_data: :return: """ try: fp_list = utils.get_filepaths(self.data_src_path) for file_path in fp_list : str_buf = self._load_local_files(file_path) conv_buf = self.encode_pad(self._preprocess(str_buf, type=self.data_preprocess_type)) self._save_hdf5(conv_buf) except Exception as e: raise Exception(e)
def _load_local_files(self, file_path): """ :return: """ with open(file_path, 'r') as myfile: os.remove(file_path) return myfile.readlines() def _save_hdf5(self, buffer_list): """ :param buffer_list: :return: """ file_name = strftime("%Y-%m-%d-%H:%M:%S", gmtime()) output_path = os.path.join(self.data_store_path, file_name) h5file = h5py.File(output_path, 'w', chunk=True) dt_vlen = h5py.special_dtype(vlen=str) dt_arr = np.dtype((dt_vlen, (self.sent_max_len,))) h5raw = h5file.create_dataset('rawdata', (len(buffer_list),), dtype=dt_arr) for i in range(len(buffer_list)): h5raw[i] = np.array(buffer_list[i], dtype=object) h5file.flush() h5file.close() def _init_node_parm(self, key): """ init parms by using master classes (handling params) :return: """ wf_conf = WorkFlowDataText(key) self.data_sql_stmt = wf_conf.get_sql_stmt() self.data_src_path = wf_conf.get_source_path() self.data_src_type = wf_conf.get_src_type() self.data_store_path = wf_conf.get_step_store() self.data_server_type = wf_conf.get_src_server() self.data_parse_type = wf_conf.get_parse_type() self.sent_max_len = wf_conf.get_max_sent_len() self.data_preprocess_type = wf_conf.get_step_preprocess() def _set_progress_state(self): return None
[docs] def load_data(self, node_id = "", parm = 'all'): """ load train data :param node_id: :param parm: :return: """ return utils.get_filepaths(self.data_store_path)