Source code for cluster.preprocess.pre_node_merge_text2seq
from cluster.preprocess.pre_node import PreProcessNode
from master.workflow.preprocess.workflow_pre_merge import WorkFlowPreMerge as WFPreMerge
[docs]class PreNodeMergeText2Seq(PreProcessNode):
"""
"""
[docs] def run(self, conf_data):
return True
def _init_node_parm(self, key):
"""
:return:
"""
wf_conf = WFPreMerge(key)
self.batch_size = wf_conf.get_batchsize()
self.merge_rule = wf_conf.get_merge_rule()
self.merge_type = wf_conf.get_type()
self.state_code = wf_conf.get_state_code()
def _set_progress_state(self):
pass
[docs] def load_data(self, node_id, parm = 'all'):
"""
load train data
:param node_id:
:param parm:
:return:
"""
self._init_node_parm(node_id)
if(self.merge_type == 'seq2seq') :
return self._merge_seq2seq_type()
else :
raise Exception ("merge node error: not defined type {0}".format(self.merge_type))
def _merge_seq2seq_type(self):
"""
merge two data node into one for seq2seq anal
:return:
"""
file_lists = []
encode_data = []
encode_node_list = self.merge_rule['encode_node']
if (len(encode_node_list) > 0):
for node_name in encode_node_list:
cls_path, cls_name = self.get_cluster_exec_class(str(self.state_code) + "_" + node_name)
dyna_cls = self.load_class(cls_path, cls_name)
encode_data = encode_data + dyna_cls.load_data(self.state_code + "_" + node_name, parm='all')
file_lists.append(encode_data)
decode_data = []
decode_node_list = self.merge_rule['decode_node']
if (len(decode_node_list) > 0):
for node_name in decode_node_list:
cls_path, cls_name = self.get_cluster_exec_class(self.state_code + "_" + node_name)
dyna_cls = self.load_class(cls_path, cls_name)
decode_data = decode_data + dyna_cls.load_data(self.state_code + "_" + node_name, parm='all')
file_lists.append(decode_data)
return file_lists