Source code for cluster.preprocess.pre_node_feed_fr2attn
from cluster.preprocess.pre_node_feed import PreNodeFeed
from master.workflow.preprocess.workflow_feed_fr2auto import WorkflowFeedFr2Auto
import pandas as pd
import warnings
import numpy as np
from functools import reduce
from konlpy.tag import Mecab
from common.utils import *
[docs]class PreNodeFeedFr2Attn(PreNodeFeed):
"""
"""
[docs] def run(self, conf_data):
"""
override init class
"""
super(PreNodeFeedFr2Attn, self).run(conf_data)
self._init_node_parm(conf_data['node_id'])
def _get_node_parm(self, node_id):
"""
return conf master class
:return:
"""
return WorkflowFeedFr2Auto(node_id)
def _init_node_parm(self, node_id):
"""
:param node_id:
:return:
"""
try:
wf_conf = WorkflowFeedFr2Auto(node_id)
self.wf_conf = wf_conf
self.preprocess_type = wf_conf.get_preprocess_type()
if (self.preprocess_type in ['frame']):
self._init_frame_node_parm(wf_conf)
elif (self.preprocess_type in ['mecab', 'kkma', 'twitter']):
self._init_nlp_node_parm(wf_conf)
insert_dict = {}
for key in list(self.__dict__.keys()) :
if key in ['preprocess_type','encode_col', 'encode_len', 'embed_type',
'word_vector_size', 'input_size','encode_dtype'] :
insert_dict[key] = self.__dict__[key]
wf_conf.update_view_obj(node_id, insert_dict)
except Exception as e:
raise Exception(e)
def _init_frame_node_parm(self, wf_conf):
"""
init parms when data type is frame
:param wf_conf:
:return:
"""
self.encode_col = wf_conf.get_encode_column()
self.encode_len = {}
self.encode_dtype = {}
self.encode_onehot = {}
self.embed_type = wf_conf.get_embed_type()
self.word_vector_size = wf_conf.get_vocab_size() + 4
self.input_size = 0
if (self.embed_type == 'onehot'):
if(wf_conf.get_vocab_list()) :
encoder_value_list = wf_conf.get_vocab_list()
for col_name in list(encoder_value_list.keys()):
self.encode_onehot[col_name] = OneHotEncoder(self.word_vector_size)
self.encode_onehot[col_name].restore(encoder_value_list.get(col_name))
self._init_frame_node_parm_with_data()
def _init_frame_node_parm_with_data(self):
"""
init pamr s need to be calculated
:return:s
"""
try :
store = pd.HDFStore(self.input_paths[0])
chunk = store.select('table1',
start=0,
stop=100)
for col_name in self.encode_col:
if (self.encode_len.get(col_name) == None):
if (chunk[col_name].dtype in ['int', 'float']):
self.encode_len[col_name] = 1
self.input_size = self.input_size + 1
else:
self.encode_len[col_name] = self.word_vector_size
self.input_size = self.input_size + self.word_vector_size
self.encode_onehot[col_name] = OneHotEncoder(self.word_vector_size)
self.encode_dtype[col_name] = str(chunk[col_name].dtype)
except Exception as e :
raise Exception ("error on wcnn feed parm prepare : {0}".format(e))
def _init_nlp_node_parm(self, wf_conf):
"""
init parms when data type is nlp
:param wf_conf:
:return:
"""
self.encode_col = wf_conf.get_encode_column()
self.encode_len = wf_conf.get_encode_len()
self.embed_type = wf_conf.get_embed_type()
self.word_vector_size = wf_conf.get_vocab_size() + 4
self.input_size = int(self.encode_len) * int(self.word_vector_size)
if (self.embed_type == 'onehot'):
self.onehot_encoder = OneHotEncoder(self.word_vector_size)
if (wf_conf.get_vocab_list()):
self.onehot_encoder.restore(wf_conf.get_vocab_list())
def _convert_data_format(self, file_path, index):
"""
convert variois data to matrix fit to autoencoder
:param obj:
:param index:
:return:
"""
if(self.preprocess_type in ['frame']) :
return self._frame_parser(file_path, index)
elif(self.preprocess_type in ['mecab', 'kkma', 'twitter']) :
return self._nlp_parser(file_path, index)
def _frame_parser(self, file_path, index):
"""
parse nlp data
:return:
"""
try :
store = pd.HDFStore(file_path)
chunk = store.select('table1',
start=index.start,
stop=index.stop)
input_vector = []
count = index.stop - index.start
for col_name in self.encode_col:
if (chunk[col_name].dtype == 'O'):
input_vector.append(list(map(lambda x: self.encode_onehot[col_name].get_vector(x),
chunk[col_name][0:count].tolist())))
else :
input_vector.append(np.array(list(map(lambda x: [self._filter_nan(x)], chunk[col_name][0:count].tolist()))))
return self._flat_data(input_vector, len(chunk[col_name][0:count].tolist()))
except Exception as e :
raise Exception (e)
finally:
store.close()
def _filter_nan(self, x):
"""
map nan to 0
:param x:
:return:
"""
import math
if(math.isnan(x)) :
return 0.0
else :
return x
def _flat_data(self, input_vector, count):
"""
:param input_vector:
:return:
"""
try :
result = []
for i in range(count) :
row = []
for col in input_vector :
row = row + col[i].tolist()
result.append(row)
return np.array(result, dtype='f')
except Exception as e :
raise Exception ("wcnn data prepare flat_data error : {0}".format(e))
def _nlp_parser(self, file_path, index):
"""
parse nlp data
:return:
"""
try :
store = pd.HDFStore(file_path)
chunk = store.select('table1',
start=index.start,
stop=index.stop)
count = index.stop - index.start
if (self.encode_col in chunk):
encode = self.encode_pad(self._preprocess(chunk[self.encode_col].values)[0:count],
max_len=self.encode_len)
return self._word_embed_data(self.embed_type, encode)
else:
warnings.warn("not exists column names requested !!")
return [['#'] * self.encode_len]
except Exception as e :
raise Exception (e)
finally:
store.close()
def _preprocess(self, input_data):
"""
:param input_data:
:return:
"""
if(self.preprocess_type == 'mecab') :
return self._mecab_parse(input_data)
elif (self.preprocess_type == 'kkma'):
return self._mecab_parse(input_data)
elif (self.preprocess_type == 'twitter'):
return self._mecab_parse(input_data)
else :
return input_data
[docs] def data_size(self):
"""
get data array size of this calss
:return:
"""
try :
store = pd.HDFStore(self.input_paths[self.pointer])
table_data = store.select('table1')
return table_data[table_data.columns.values[0]].count()
except Exception as e :
raise Exception (e)
finally:
store.close()
[docs] def has_next(self):
"""
check if hdf5 file pointer has next
:return:
"""
if(len(self.input_paths) > self.pointer) :
return True
else :
if (self.preprocess_type in ['frame']):
if (self.embed_type == 'onehot'):
save_dic = {}
for col_name in self.encode_col:
save_dic[col_name] = self.encode_onehot[col_name].dics()
self.wf_conf.set_vocab_list(save_dic)
return False
elif (self.preprocess_type in ['mecab', 'kkma', 'twitter']):
if (self.embed_type == 'onehot'):
self.wf_conf.set_vocab_list(self.onehot_encoder.dics())
return False