Source code for cluster.neuralnet.neuralnet_node_autoencoder

from cluster.neuralnet.neuralnet_node import NeuralNetNode
from master.workflow.netconf.workflow_netconf_autoencoder import WorkFlowNetConfAutoEncoder as WFNetConfAuto
import tensorflow as tf
import numpy as np
from scipy import spatial
from common.utils import *
from konlpy.tag import Mecab
from common.graph.nn_graph_manager import NeuralNetModel
import logging

[docs]class NeuralNetNodeAutoEncoder(NeuralNetNode): """ this is a network class for Autoencoder Autoencoder provide two types of service 1. provide matrix size of input matrix 2. provide compressed matrix """
[docs] def run(self, conf_data): try: node_id = conf_data['node_id'] # get prev node for load data train_data_set = self.get_linked_prev_node_with_grp('preprocess')[0] # copy data feeder's parm to netconf self._copy_node_parms(train_data_set, self) # init parms self._init_node_parm(conf_data['node_id']) # set autoencoder graph self._set_train_model() # create tensorflow session init = tf.global_variables_initializer() with tf.Session() as sess : sess.run(init) saver = tf.train.Saver(tf.all_variables()) # load trained model if (self.check_batch_exist(conf_data['node_id'])): path = ''.join([self.md_store_path, '/', self.get_eval_batch(node_id), '/']) set_filepaths(path) saver.restore(sess, path) # feed data and train for _ in range(self.iter_size) : while (train_data_set.has_next()): for i in range(0, train_data_set.data_size(), self.batch_size): data_set = train_data_set[i:i + self.batch_size] if(len(data_set) >= self.batch_size) : self._run_train(sess, data_set) train_data_set.next() train_data_set.reset_pointer() # save model and close session path = ''.join([self.md_store_path, '/', self.make_batch(node_id)[1], '/']) set_filepaths(path) saver.save(sess, path) return node_id except Exception as e: logging.info("[Stacked AutoEncoder Train Process] : {0}".format(e)) raise Exception(e) finally : # copy data feeder's parm to netconf self._copy_node_parms(train_data_set, self)
def _set_train_model(self): """ set train model for autoencoder :return: """ try : self.x = tf.placeholder(tf.float32, shape=[self.batch_size, self.n_input], name='x') self.y = self.x W_encode = {} b_encode = {} W_decode = {} b_decode = {} encoder = {} decoder = {} self.n_hidden = [self.n_input] + self.n_hidden encoder_input = self.x for cnt in range(len(self.n_hidden)) : if(len(self.n_hidden) > cnt+1) : W_encode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt], self.n_hidden[cnt+1]])) b_encode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt+1]])) encoder[cnt] = tf.nn.sigmoid(tf.add(tf.matmul(encoder_input, W_encode[cnt]), b_encode[cnt])) encoder_input = encoder[cnt] decoder_input = encoder[cnt] for cnt in reversed(range(len(self.n_hidden))): if (cnt - 1 >= 0): W_decode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt], self.n_hidden[cnt-1]])) b_decode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt-1]])) decoder[cnt] = tf.nn.sigmoid(tf.add(tf.matmul(decoder_input, W_decode[cnt]), b_decode[cnt])) decoder_input = decoder[cnt] self.cost = tf.reduce_mean(tf.pow(self.y - decoder[1], 2)) self.optimizer = tf.train.RMSPropOptimizer(self.learning_rate).minimize(self.cost) self.init_val = tf.initialize_all_variables() self.saver = tf.train.Saver(tf.all_variables()) except Exception as e : raise Exception ("error on build autoencoder train graph") def _set_predict_model(self): """ set train model for autoencoder :return: """ try : self.x = tf.placeholder(tf.float32, shape=[1, self.n_input], name='x') self.y = self.x W_encode = {} b_encode = {} W_decode = {} b_decode = {} encoder = {} decoder = {} self.n_hidden = [self.n_input] + self.n_hidden encoder_input = self.x for cnt in range(len(self.n_hidden)) : if(len(self.n_hidden) > cnt+1) : W_encode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt], self.n_hidden[cnt+1]])) b_encode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt+1]])) encoder[cnt] = tf.nn.sigmoid(tf.add(tf.matmul(encoder_input, W_encode[cnt]), b_encode[cnt])) encoder_input = encoder[cnt] decoder_input = encoder[cnt] for cnt in reversed(range(len(self.n_hidden))): if (cnt - 1 >= 0): W_decode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt], self.n_hidden[cnt-1]])) b_decode[cnt] = tf.Variable(tf.random_normal([self.n_hidden[cnt-1]])) decoder[cnt] = tf.nn.sigmoid(tf.add(tf.matmul(decoder_input, W_decode[cnt]), b_decode[cnt])) decoder_input = decoder[cnt] self.comp_vec = encoder[len(self.n_hidden)-2] self.recov_vec = decoder[1] self.init_val = tf.initialize_all_variables() self.saver = tf.train.Saver(tf.all_variables()) except Exception as e : raise Exception ("error on build autoencoder train graph") def _run_train(self, sess, data_set): """ :return: """ try : _, cost_val = sess.run([self.optimizer, self.cost], feed_dict={self.x: data_set}) logging.info('Avg. cost = {0}'.format(cost_val / self.n_input)) except Exception as e : raise Exception ('autoencoder run_train step error : {0}'.foramt(e) ) def _get_node_parm(self, node_id): """ return conf master class :return: """ return WFNetConfAuto(node_id) def _init_node_parm(self, node_id): """ initialze parms for autoencoder :param node_id: :return: """ try: wf_conf = WFNetConfAuto(node_id) self.node_id = node_id self.md_store_path = wf_conf.get_model_store_path() self.iter_size = wf_conf.get_iter_size() self.batch_size = wf_conf.get_batch_size() self.learning_rate = wf_conf.get_learn_rate() self.embed_type = wf_conf.get_embed_type() self.pre_type = wf_conf.get_feeder_pre_type() self.n_hidden = wf_conf.get_n_hidden() if (wf_conf.get_n_input()): self.n_input = wf_conf.get_n_input() else : raise Exception ("input number is required! ") if(self.pre_type in ['frame']) : if (self.embed_type == 'onehot'): self.encode_onehot = {} self.word_vector_size = wf_conf.get_vocab_size() + 4 self.encode_col = wf_conf.get_encode_column() self.encode_dtype = wf_conf.get_encode_dtype() if (wf_conf.get_vocab_list()): encoder_value_list = wf_conf.get_vocab_list() for col_name in list(encoder_value_list.keys()): self.encode_onehot[col_name] = OneHotEncoder(self.word_vector_size) self.encode_onehot[col_name].restore(encoder_value_list.get(col_name)) elif(self.pre_type in ['mecab', 'twitter', 'kkma']): if(self.embed_type == 'onehot') : self.word_len = wf_conf.get_encode_len() self.word_vector_size = wf_conf.get_vocab_size() + 4 self.onehot_encoder = OneHotEncoder(self.word_vector_size) if (wf_conf.get_vocab_list()): self.onehot_encoder.restore(wf_conf.get_vocab_list()) except Exception as e : raise Exception (e)
[docs] def predict(self, node_id, parm = {"input_data" : {}, "type": "encoder"}, internal=False, raw_flag=False): """ :param node_id: :param parm: :return: """ try : # get unique key unique_key = '_'.join([node_id , self.get_eval_batch(node_id)]) # set init params self._init_node_parm(node_id) ## create tensorflow graph if (NeuralNetModel.dict.get(unique_key)): self = NeuralNetModel.dict.get(unique_key) graph = NeuralNetModel.graph.get(unique_key) else: self._set_predict_model() NeuralNetModel.dict[unique_key] = self NeuralNetModel.graph[unique_key] = tf.get_default_graph() graph = tf.get_default_graph() # off onehot to add dict on predict time if (raw_flag) : input_arr = parm['input_data'] elif (self.pre_type in ['frame']): input_arr = [] if (self.embed_type == 'onehot'): for col_name in self.encode_col : if(self.encode_dtype[col_name] != 'object') : input_arr = input_arr + [int(parm['input_data'].get(col_name))] elif(col_name in list(parm['input_data'].keys())) : input_arr = input_arr + self.encode_onehot[col_name].get_vector(parm['input_data'].get(col_name)).tolist() else : input_arr = input_arr + self.encode_onehot[col_name].get_vector('').tolist() input_arr = [input_arr] elif (self.pre_type in ['mecab', 'twitter', 'kkma']): if (self.embed_type == 'onehot'): self.onehot_encoder.off_edit_mode() input_arr = [self._pos_tag_predict_data(parm['input_data'], self.word_len)] input_arr = self._word_embed_data(self.embed_type, np.array(input_arr)) else : raise Exception ("AutoEncoder : Unknown embed type error ") with tf.Session(graph=graph) as sess : sess.run(self.init_val) # load trained model if (self.check_batch_exist(self.node_id)): path = ''.join([self.md_store_path, '/', self.get_eval_batch(node_id), '/']) set_filepaths(path) self.saver.restore(sess, path) else: raise Exception("Autoencoder error : no pretrained model exist") # decide which layer to return if (parm['type'] == 'encoder'): result = sess.run(self.comp_vec, feed_dict={self.x: input_arr}) if (parm['type'] == 'decoder'): result = sess.run(self.recov_vec, feed_dict={self.x: input_arr}) # for eval purpose return original data together if (internal) : return result.tolist(), input_arr else : return result.tolist() except Exception as e : raise Exception (e) finally : sess.close()
[docs] def anomaly_detection(self, node_id, parm = {"input_data" : {}, "type": "encoder"}, raw_flag=False): """ this is a function that judge requested data is out lier of not :param node_id: string :param parm: dict (include input data) :return: boolean """ try : parm['type'] = 'decoder' out_data, in_data = self.predict(node_id, parm, internal=True, raw_flag=raw_flag) dist = 1 - spatial.distance.cosine(in_data[0], out_data[0]) return dist except Exception as e : raise Exception ("error on anomaly detection : {0}".format(e))
def _set_progress_state(self): return None
[docs] def eval(self, node_id, conf_data, data=None, result=None, stand=0.1): """ eval process check if model works well (accuracy with cross table) :param node_id: :param conf_data: :param data: :param result: :return: """ try : node_id = self.get_node_name() result.set_result_data_format(None) result.set_nn_batch_ver_id(self.get_eval_batch(node_id)) # prepare net conf tf.reset_default_graph() while (data.has_next()): for i in range(0, data.data_size(), 1): data_set = data[i:i + 1] parm = {"input_data": data_set, "type": "decoder"} dist = self.anomaly_detection(node_id, parm, raw_flag=True) result.set_result_info([str(stand)], [str(dist)]) data.next() return result except Exception as e : raise Exception ("error on eval wcnn : {0}".format(e))