from cluster.neuralnet.neuralnet_node import NeuralNetNode
from master.workflow.netconf.workflow_netconf_seq2seq import WorkFlowNetConfSeq2Seq as WfNetconfSeq2Seq
from cluster.service.service_predict_w2v import PredictNetW2V
import numpy as np
import tensorflow as tf
import logging
from common.utils import *
from konlpy.tag import Mecab
from common.graph.nn_graph_manager import NeuralNetModel
[docs]class NeuralNetNodeSeq2Seq(NeuralNetNode):
"""
"""
[docs] def run(self, conf_data):
try :
# init parms for word2vec node
node_id = conf_data['node_id']
self._init_node_parm(node_id)
# get prev node for load data
train_data_set = self.get_linked_prev_node_with_grp('preprocess')[0]
# prepare net conf
tf.reset_default_graph()
self._set_train_model()
# create session
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
saver = tf.train.Saver(tf.all_variables())
if (self.check_batch_exist(conf_data['node_id'])):
path = ''.join([self.md_store_path, '/', self.get_eval_batch(node_id), '/'])
set_filepaths(path)
saver.restore(sess, path)
for self.epoch in range(self.num_epochs):
# run train
while(train_data_set.has_next()) :
for i in range(0, train_data_set.data_size(), self.batch_size):
data_set = train_data_set[i:i + self.batch_size]
if(len(data_set[0]) != self.batch_size) : break
targets = self._get_dict_id(data_set[1])
decode_batch = self._word_embed_data(data_set[1])
encode_batch = self._word_embed_data(data_set[0])
self._run_train(sess, encode_batch, decode_batch, targets)
train_data_set.next()
train_data_set.reset_pointer()
# save model and close session
path = ''.join([self.md_store_path, '/', self.make_batch(node_id)[1], '/'])
set_filepaths(path)
saver.save(sess, path)
except Exception as e :
logging.info("[BasicSeq2Seq Train Process] : {0}".format(e))
raise Exception (e)
finally :
if (self.word_embed_type == 'onehot'):
self.wf_conf.set_vocab_list(self.onehot_encoder.dics())
def _init_node_parm(self, node_id):
"""
init necessary parameters
:param node_id:
:return:
"""
try :
try :
self.wf_conf = WfNetconfSeq2Seq(node_id)
wf_conf = WfNetconfSeq2Seq(node_id)
self.md_store_path = wf_conf.get_model_store_path()
self.cell_type = wf_conf.get_cell_type()
self.decoder_num_layers = wf_conf.get_decoder_depth()
self.decoder_seq_length = wf_conf.get_decoder_len()
self.drop_out = wf_conf.get_drop_out()
self.encoder_num_layers = wf_conf.get_encoder_depth()
self.encoder_seq_length = wf_conf.get_encoder_len()
self.word_embed_type = wf_conf.get_word_embed_type()
self.cell_size = wf_conf.get_cell_size()
if(self.word_embed_type == 'w2v') :
self.word_embed_id = wf_conf.get_word_embed_id()
self.word_vector_size = self._get_w2v_vector_size(self.word_embed_id)
if (wf_conf.get_vocab_size() != None and wf_conf.get_vocab_size() == self._get_vocab_size()):
self.vocab_size = wf_conf.get_vocab_size() + 4
else:
del_filepaths(self.md_store_path)
wf_conf.set_vocab_size(self._get_vocab_size())
self.vocab_size = self._get_vocab_size() + 4
elif (self.word_embed_type == 'onehot'):
self.word_vector_size = wf_conf.get_vocab_size() + 4
self.vocab_size = wf_conf.get_vocab_size() + 4
self.onehot_encoder = OneHotEncoder(self.word_vector_size)
if (wf_conf.get_vocab_list()) :
self.onehot_encoder.restore(wf_conf.get_vocab_list())
self.grad_clip = 5.
self.learning_rate = wf_conf.get_learn_rate()
self.decay_rate = wf_conf.get_learn_rate()
self.num_epochs = wf_conf.get_iter_size()
self.batch_size = wf_conf.get_batch_size()
self.predict_batch = 1
except Exception as e :
raise Exception ("seq2seq netconf parms not set")
except Exception as e :
raise Exception (e)
def _get_w2v_vector_size(self, nn_id):
"""
get active version word2vec networks config
:param nn_id:
:return:
"""
node_id = self._find_netconf_node_id(nn_id)
_path, _cls = self.get_cluster_exec_class(node_id)
cls = self.load_class(_path, _cls)
cls._init_node_parm(node_id)
if('vector_size' in cls.__dict__) :
return cls.vector_size
return 10
def _get_linked_prev_node_has_vector(self):
"""
get linked node prev with condition sent_max_len exists
:param type:
:return:
"""
objs = self.get_linked_prev_node_with_cond('sent_max_len')
if(len(objs) > 0 ) :
return objs[0].sent_max_len
return 100
def _set_progress_state(self):
return None
[docs] def predict(self, node_id, parm = {"input_data" : {}, "num" : 0, "clean_ans": True}):
"""
:param node_id:
:param parm:
:return:
"""
try :
# get unique key
unique_key = '_'.join([node_id, self.get_eval_batch(node_id)])
# set init params
self.node_id = node_id
self._init_node_parm(self.node_id)
# prepare net conf
tf.reset_default_graph()
## create tensorflow graph
if (NeuralNetModel.dict.get(unique_key)):
self = NeuralNetModel.dict.get(unique_key)
graph = NeuralNetModel.graph.get(unique_key)
else:
self._set_predict_model()
NeuralNetModel.dict[unique_key] = self
NeuralNetModel.graph[unique_key] = tf.get_default_graph()
graph = tf.get_default_graph()
with tf.Session(graph=graph) as sess :
sess.run(self.init_val)
result = self._run_predict(sess, parm['input_data'],
predict_num=parm.get("num") if parm.get("num") != None else 0,
clean_ans = parm.get("clean_ans") if parm.get("clean_ans") != None else True,
batch_ver='eval', # TODO : need to be predict version
saver=self.saver)
return result
except Exception as e :
raise Exception ("seq2seq predict error : {0}".format(e))
finally:
sess.close()
[docs] def eval(self, node_id, conf, data=None, result=None):
"""
:param node_id:
:param parm:
:return:
"""
try :
result.set_result_data_format({})
tf.reset_default_graph()
sess = tf.Session()
sess.run(tf.initialize_all_variables())
# prepare net conf
self._set_predict_model()
self.node_id = node_id
while (data.has_next()):
for i in range(0, data.data_size(), self.predict_batch):
data_set = data[i:i + self.predict_batch]
if (len(data_set[0]) != self.predict_batch): break
predict = self._run_predict(sess, data_set[0][0], type='pre', clean_ans=False)
result.set_result_info(' '.join(data_set[1][0]), ' '.join(predict[0]), input=' '.join(data_set[0][0]), acc=None)
data.next()
return result
except Exception as e :
raise Exception("seq2seq eval error : {0}".format(e))
finally:
sess.close()
def _word_embed_data(self, input_data):
"""
change word to vector
:param input_data:
:return:
"""
return_arr = []
if(self.word_embed_type == 'w2v'):
for data in input_data :
parm = {"type" : "train", "val_1" : {}, "val_2" : []}
parm['val_1'] = data
return_arr.append(PredictNetW2V().run(self.word_embed_id, parm))
return return_arr
elif(self.word_embed_type == 'onehot') :
for data in input_data:
row_arr = []
for row in data :
row_arr.append(self.onehot_encoder.get_vector(row))
return_arr.append(row_arr)
return return_arr
else :
raise Exception ("[Error] seq2seq train - word embeding : not defined type {0}".format(self.word_embed_type))
def _get_dict_id(self, input_data):
"""
change word to vector
:param input_data:
:return:
"""
return_arr = []
if(self.word_embed_type == 'w2v') :
for data in input_data :
parm = {"type" : "dict", "val_1" : {}, "val_2" : []}
parm['val_1'] = data
return_arr.append(PredictNetW2V().run(self.word_embed_id, parm))
return return_arr
elif(self.word_embed_type == 'onehot') :
for data in input_data:
row_arr = []
for row in data:
row_arr.append(self.onehot_encoder.get_idx(row))
return_arr.append(row_arr)
return return_arr
else :
raise Exception ("[Error] seq2seq train - word embeding : not defined type {0}".format(self.word_embed_type))
def _get_vec2word(self, input_data):
"""
change word to vector
:param input_data:
:return:
"""
return_arr = []
if(self.word_embed_type == 'w2v'):
for data in input_data :
parm = {"type" : "vec2word", "val_1" : {}, "val_2" : []}
parm['val_1'] = data
return_arr.append(PredictNetW2V().run(self.word_embed_id, parm))
return return_arr
else :
raise Exception ("[Error] seq2seq train - word embeding : not defined type {0}".format(self.word_embed_type))
def _get_index2vocab(self, input_data, prob_idx = 0):
"""
change word to vector
:param input_data:
:return:
"""
return_arr = []
if(self.word_embed_type == 'w2v'):
for data in input_data :
parm = {"type" : "povb2vocab", "val_1" : {}, "val_2" : [], "prob_idx" : prob_idx}
parm['val_1'] = data
return_arr.append(PredictNetW2V().run(self.word_embed_id, parm))
return return_arr
elif (self.word_embed_type == 'onehot'):
for data in input_data:
row_arr = []
for row in data:
row_arr.append(self.onehot_encoder.get_vocab(row, prob_idx = prob_idx))
return_arr.append(row_arr)
return return_arr
else :
raise Exception ("[Error] seq2seq train - word embeding : not defined type {0}".format(self.word_embed_type))
def _get_vocab_size(self):
"""
change word to vector
:param input_data:
:return:
"""
if(self.word_embed_type == 'w2v'):
parm = {"type" : "vocablen", "val_1" : {}, "val_2" : []}
return PredictNetW2V().run(self.word_embed_id, parm)
else :
raise Exception ("[Error] seq2seq train - word embeding : not defined type {0}".format(self.word_embed_type))
def _set_weight_vectors(self):
"""
set weight vecotrs seperatly for sharing weight with preidicts logic
:return:
"""
# Weigths
with tf.variable_scope('rnnlm') as scope:
self.softmax_w = tf.get_variable("softmax_w", [self.cell_size, self.vocab_size])
self.softmax_b = tf.get_variable("softmax_b", [self.vocab_size])
def _set_train_model(self):
"""
set tensorflow seq2seq model for train and predict
:return:
"""
try :
# Construct RNN model
cells = []
for _ in range(self.encoder_num_layers) :
unitcell = tf.contrib.rnn.BasicLSTMCell(self.cell_size)
dropcell = tf.contrib.rnn.DropoutWrapper(unitcell,
input_keep_prob=1.0,
output_keep_prob=self.drop_out)
cells.append(dropcell)
mul_cell = tf.contrib.rnn.MultiRNNCell(cells)
self.input_data = tf.placeholder(tf.float32, [self.batch_size, self.encoder_seq_length, self.word_vector_size])
self.output_data = tf.placeholder(tf.float32, [self.batch_size, self.decoder_seq_length, self.word_vector_size])
self.targets = tf.placeholder(tf.int32, [self.batch_size, self.decoder_seq_length])
self.istate = mul_cell.zero_state(self.batch_size, tf.float32)
# set weight vectors
self._set_weight_vectors()
# reshape data matirx
inputs = tf.split(self.input_data, self.encoder_seq_length, 1)
inputs = [tf.squeeze(_input, [1]) for _input in inputs]
outputs = tf.split(self.output_data, self.decoder_seq_length, 1)
outputs = [tf.squeeze(_output, [1]) for _output in outputs]
self.outputs, last_state = tf.contrib.legacy_seq2seq.basic_rnn_seq2seq(inputs,
outputs,
mul_cell,
dtype=tf.float32,
scope='rnnlm')
self.output = tf.reshape(tf.concat(self.outputs, 1), [-1, self.cell_size])
self.logits = tf.nn.xw_plus_b(self.output, self.softmax_w, self.softmax_b)
self.probs = tf.nn.softmax(self.logits)
# Loss
self.loss = tf.contrib.legacy_seq2seq.sequence_loss([self.logits], # Input
[tf.reshape(self.targets, [-1])], # Target
[tf.ones([self.batch_size * self.decoder_seq_length])], # Weight
self.vocab_size)
# Optimizer
self.cost = tf.reduce_sum(self.loss) / self.batch_size / self.decoder_seq_length
self.final_state = last_state
_opt = tf.train.AdamOptimizer(
learning_rate=self.learning_rate,
beta1=0.9,
beta2=0.999,
epsilon=1e-08
)
self.optm = _opt.minimize(self.loss)
except Exception as e :
raise Exception (e)
def _run_train(self, sess, xbatch, ybatch, target):
"""
:return:
"""
try :
# Learning rate scheduling
#sess.run(tf.assign(self.lr, self.learning_rate * (self.decay_rate ** epoch)))
state = sess.run(self.istate)
train_loss, state, _ = sess.run([self.cost, self.final_state, self.optm]
, feed_dict={self.input_data: xbatch,
self.output_data: ybatch,
self.targets: target,
self.istate: state})
print("[{0}] train_loss : {1}".format(self.epoch, train_loss))
return sess
except Exception as e :
raise Exception(e)
def _set_predict_model(self):
"""prob_idx
set tensorflow seq2seq model for train and predict
:return:
"""
try :
# off onehot to add dict on predict time
if (self.word_embed_type == 'onehot'):
self.onehot_encoder.off_edit_mode()
# Construct RNN model
cells = []
for _ in range(self.encoder_num_layers) :
unitcell = tf.contrib.rnn.BasicLSTMCell(self.cell_size)
cells.append(unitcell)
self.mul_cell = tf.contrib.rnn.MultiRNNCell(cells)
self.input_data = tf.placeholder(tf.float32, [self.predict_batch, self.encoder_seq_length, self.word_vector_size])
self.output_data = tf.placeholder(tf.float32, [self.predict_batch, self.decoder_seq_length, self.word_vector_size])
self.istate = self.mul_cell.zero_state(self.predict_batch, tf.float32)
# set weight vectors
self._set_weight_vectors()
# Weigths
inputs = tf.split(self.input_data, self.encoder_seq_length, 1)
inputs = [tf.squeeze(_input, [1]) for _input in inputs]
outputs = tf.split(self.output_data, self.decoder_seq_length, 1)
outputs = [tf.squeeze(_output, [1]) for _output in outputs]
self.outputs, self.last_state = tf.contrib.legacy_seq2seq.basic_rnn_seq2seq(inputs,
outputs,
self.mul_cell,
dtype=tf.float32,
scope='rnnlm')
self.output = tf.reshape(tf.concat(self.outputs, 1), [-1, self.cell_size])
self.logits = tf.nn.xw_plus_b(self.output, self.softmax_w, self.softmax_b)
self.probs = tf.nn.softmax(self.logits)
self.init_val = tf.initialize_all_variables()
self.saver = tf.train.Saver(tf.all_variables())
except Exception as e :
raise Exception (e)
def _run_predict(self, sess, x_input, type='raw', clean_ans=True, predict_num=0, batch_ver='eval', saver=None):
"""
run actual predict
:return:
"""
try :
#restore model
if(saver == None) :
saver = tf.train.Saver(tf.all_variables())
if (batch_ver == 'eval') :
batch_ver_name = self.get_eval_batch(self.node_id)
else :
batch_ver_name = self.get_active_batch(self.node_id)
if (self.check_batch_exist(self.node_id)):
saver.restore(sess, ''.join([self.md_store_path , '/', batch_ver_name, '/']))
else :
raise Exception ("error : no pretrained model exist")
#preprocess input data if necessary
word_list = []
if(type == 'raw') :
word_list = [self._pos_tag_predict_data(x_input)]
elif(type=='pre'):
word_list = [x_input]
else :
raise Exception ("Wrong predict data type error!")
# run predict
output = ['@'] + [''] * (self.decoder_seq_length - 1)
responses = []
state = sess.run(self.mul_cell.zero_state(1, tf.float32))
outputs, probs, state = sess.run([self.outputs, self.probs, self.last_state] ,
feed_dict={self.input_data: self._word_embed_data(np.array(word_list)),
self.output_data: self._word_embed_data(np.array([output])),
self.istate: state})
for idx in range(0, predict_num + 1) :
response = None
if(clean_ans) :
#prepare clean answer
response = ""
start_flag = False
for i in range(0,self.decoder_seq_length) :
word = self._get_index2vocab(np.array([[probs[i]]]), prob_idx = idx)[0][0]
response, flag, start_flag = self._clean_predict_result(word, response, start_flag)
if(flag == False) :
break
else :
#return vector
response = []
for i in range(0, self.decoder_seq_length):
word = self._get_index2vocab(np.array([[probs[i]]]), prob_idx = idx)[0][0]
response.append(word)
responses.append(response)
return responses
except Exception as e :
raise Exception(e)
def _pad_predict_input(self, input_tuple):
"""
pad chars for prediction
:param input_tuple:
:return:
"""
pad_size = self.encoder_seq_length - (len(input_tuple) + 1)
if(pad_size >= 0 ) :
input_tuple = pad_size * [('#', '')] + input_tuple[0: self.encoder_seq_length-1] + [('SF', '')]
else :
input_tuple = input_tuple[0: self.encoder_seq_length-1] + [('SF', '')]
return input_tuple
def _pos_tag_predict_data(self, x_input):
"""
:param x_input:
:return:
"""
word_list = []
mecab = Mecab('/usr/local/lib/mecab/dic/mecab-ko-dic')
for word_tuple in self._pad_predict_input(mecab.pos(x_input)):
if (len(word_tuple[1]) > 0):
word = ''.join([word_tuple[0], "/", word_tuple[1]])
else:
word = word_tuple[0]
word_list.append(word)
return word_list
def _clean_predict_result(self, word, respone, start_flag):
"""
clean predict result
:param word:
:return:
"""
if (word in ['START', '@']):
start_flag = True
if (word not in ['PAD', 'UNKNOWN', 'START', '#', '@', 'SF', '.'] and start_flag == True and len(word) > 0):
if ('/' in word):
return respone + ' ' + word.split('/')[0] , True, start_flag
if ('/' not in word):
return respone + ' ' + word , True, start_flag
if (word in ['SF', './SF', '?/SF']):
return respone , False, start_flag
return respone, True, start_flag