from cluster.neuralnet.neuralnet_node import NeuralNetNode
from master.workflow.netconf.workflow_netconf_wdnn import WorkFlowNetConfWdnn
from master.workflow.data.workflow_data_frame import WorkFlowDataFrame
import pandas as pd
import tensorflow as tf
import json
import os
from master.workflow.dataconf.workflow_dataconf_frame import WorkflowDataConfFrame
from cluster.common.neural_common_wdnn import NeuralCommonWdnn
from cluster.preprocess.pre_node_feed_fr2wdnn import PreNodeFeedFr2Wdnn
from common import utils
import tensorflow as tf
import math
from master.workflow.dataconf.workflow_dataconf_frame import WorkflowDataConfFrame as wf_data_conf
from master.workflow.data.workflow_data_frame import WorkFlowDataFrame as wf_data_node
from cluster.common.train_summary_info import TrainSummaryInfo
#from tensorflow.python.platform import tf_logging as logging
import logging
import random
import shutil, errno
from sklearn.preprocessing import LabelEncoder
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Activation, Dense, BatchNormalization
import keras
#from keras. import
from sklearn.preprocessing import MinMaxScaler
from keras.callbacks import ReduceLROnPlateau, CSVLogger, EarlyStopping
import numpy as np
[docs]class History(keras.callbacks.Callback):
[docs] def on_train_begin(self, logs={}):
self.losses = []
self.acc = []
[docs] def on_batch_end(self, batch, logs={}):
self.losses.append(str(logs.get('loss')))
self.acc.append(str(logs.get('acc')))
[docs]class NeuralNetNodeKerasdnn(NeuralNetNode):
"""
"""
[docs] def run(self, conf_data):
logging.info("NeuralNetNodeWdnn Run called")
#return None
#return None
"""
Wide & Deep Network Training
:param nnid : network id in tfmsacore_nninfo
:return: acturacy
"""
try:
self._init_node_parm(conf_data['node_id'])
self.cls_pool = conf_data['cls_pool'] # Data feeder
self.train_batch, self.batch = self.make_batch(conf_data['node_id']) #makebatch
self.before_train_batch = self.get_before_make_batch(conf_data['node_id'], self.batch) #before train batch
if self.before_train_batch != None:
self.model_train_before_path = ''.join([self.model_path+'/'+str(self.before_train_batch.nn_batch_ver_id)])
if self.train_batch == None :
self.model_train_path = ''.join([self.model_path+'/'+self.batch])
else :
self.model_train_path = ''.join([self.model_path + '/' + self.train_batch])
#model file copy
if self.before_train_batch != None:
src = self.model_train_before_path
dst = self.model_train_path
utils.copy_all(src, dst)
logging.info("model_path : {0} ".format(self.model_path))
logging.info("hidden_layers : {0} ".format(self.hidden_layers))
logging.info("activation_function : {0} ".format(self.activation_function))
logging.info("batch_size : {0} ".format(self.batch_size))
logging.info("epoch : {0} ".format(self.epoch))
logging.info("model_type : {0} ".format(self.model_type))
data_conf_info = self.data_conf
#feed
# TODO file이 여러개면 어떻하지?
# get prev node for load data
data_node_name = self._get_backward_node_with_type(conf_data['node_id'], 'preprocess')
train_data_set = self.cls_pool[data_node_name[0]] #get filename
file_queue = str(train_data_set.input_paths[0]) #get file_name
#TODO 아 eval이 안되서 데이터가 안불러
_batch_size = self.batch_size
_num_tfrecords_files = 0
#_batch_size = 2
data_set = train_data_set[0 : train_data_set.data_size() ]
input_dims = len(data_set.columns)-1 # label 갯수 제거
# model = Sequential()
# model.add(Dense(100, input_dim=input_dims, activation='relu'))
# #model.add(Dense(50))
# model.add(Dense(50, activation='sigmoid'))
# #model.add(Dense(50, activation='sigmoid'))
# model.compile(optimizer='rmsprop',
# loss='binary_crossentropy',
# metrics=['accuracy'])
model = Sequential()
model.add(Dense(10, input_dim=input_dims,init='uniform'))
model.add(BatchNormalization())
model.add(Activation('sigmoid'))
#model.add(Dense(10, init='uniform'))
#model.add(BatchNormalization())
#model.add(Activation('sigmoid'))
model.add(Dense(1, init='uniform'))
model.add(BatchNormalization())
model.add(Activation('sigmoid'))
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=np.sqrt(0.1), cooldown=0, patience=5, min_lr=0.5e-6)
early_stopper = EarlyStopping(monitor='val_acc', min_delta=0.001, patience=10)
csv_logger = CSVLogger('resnet18_cifar10.csv')
history = History()
# Compile model
model.compile(loss='binary_crossentropy', optimizer='sgd')
self.batch_size = 4
#multi Feeder modified
multi_read_flag = self.multi_read_flag
if multi_read_flag == False:
#Todo H5
# train per files in folder h5용
while(train_data_set.has_next()) :
logging.info("start keras dnn")
#파일이 하나 돌때마다
#for 배치사이즈와 파일의 총갯수를 가져다가 돌린다. -> 마지막에 뭐가 있을지 구분한다.
#파일에 iter를 넣으면 배치만큼 가져오는 fn이 있음 그걸 __itemd에 넣고
# Input 펑션에서 multi를 vk판단해서 col와 ca를 구분한다.(이걸 배치마다 할 필요가 있나?)
# -> 그러면서 피팅
#for x in range(0, self.iter_size):
for i in range(0, train_data_set.data_size(), self.batch_size):
#Last batch size preprocessing
if (i+self.batch_size > train_data_set.data_size()):
i= i - (train_data_set.data_size()%self.batch_size) + 1
data_set = train_data_set[i:i + self.batch_size]
#data_set = train_data_set[0:train_data_set.data_size()]
logging.info(i)
X_train, targets, = train_data_set.input_fn3( file_queue,data_set,data_conf_info)
#loss, accuracy =
model.fit(X_train, targets
, epochs=10
, validation_data=(X_train, targets)
, batch_size=self.batch_size
, callbacks = [lr_reducer, csv_logger, history]
)
#logging.info("keras training info loss : {0} , accuracy{1}".format(loss, accuracy))
# #Select Next file
train_data_set.next()
#os.makedirs(self.md_store_path + '/' + self.batch, exist_ok=True)
# keras.models.save_model(model, ''.join([self.md_store_path + '/' + self.batch, '/model.bin']))
print("end")
except Exception as e:
logging.error("Error Message : {0}".format(e))
raise Exception(e)
return None
[docs] def generator_len(self, it):
"""
Help for Generator length promote util class(?)
:param it : python generator
:return: length of generator
"""
return len(list(it))
[docs] def read_hdf5_chunk(self,filename):
# type4 partial read
store = pd.HDFStore(filename)
nrows = store.get_storer('table1').nrows
chunksize = 100
for i in range(nrows // chunksize + 1):
chunk = store.select('table1',
start=i * chunksize,
stop=(i + 1) * chunksize)
store.close()
return chunk
[docs] def read_hdf5(self,filename):
store = pd.HDFStore(filename)
#df = store.get_storer('table1')
df = store.select('table1')
store.close()
return df
[docs] def load_hdf5(data_path, dataframe):
"""
Load_hdf5
:param data_path:
:return:data_path
"""
store_filepath_name = data_path + "/" + "adult.h5"
hdf = pd.HDFStore(store_filepath_name)
hdf.put('table1', dataframe, format='table', data_columns=True)
hdf.close()
def _init_node_parm(self, node_id):
return None
def _set_progress_state(self):
return None
[docs] def predict(self, nn_id, conf_data, parm = {}):
# model build
#
# self._init_node_parm(nn_id)
# data_conf_info = WorkflowDataConfFrame(nn_id + "_" + ver + "_" + "dataconf_node").data_conf
try:
node_id = conf_data['node_id']
netconf = conf_data['net_conf']
dataconf = conf_data['data_conf']
# make wide & deep model
wdnn = NeuralCommonWdnn()
# wdnn_model = wdnn.wdnn_predict_build('wdnn', nn_id, netconf['hidden_layers'], netconf['activation_function'], '', netconf['model_path'], False)
wdnn_model = wdnn.wdnn_build('wdnn', nn_id, netconf['hidden_layers'],netconf['activation_function'],dataconf['data_conf'], netconf['model_path'], False)
label_column = list(dataconf['data_conf']["label"].keys())[0]
# data -> csv (pandas)
df = pd.read_csv( tf.gfile.Open('/hoya_src_root/adultest.data'),
# names=COLUMNS,
skipinitialspace=True,
engine="python")
# df['label'] = (df[label_column].apply(lambda x: "Y" in x)).astype(int)
# df['label'] = (df['income_bracket'].apply(lambda x: '>50K' in x)).astype(int)
# predict
# def input_fn(self, df, nnid, dataconf ):
predict_results = wdnn_model.predict(input_fn=lambda: wdnn.input_fn( df, nn_id, dataconf['data_conf']))
df['label'] = list(predict_results)
return None
except Exception as e:
raise Exception(e)
return None
def _init_node_parm(self, key):
"""
Init parameter from workflow_data_frame
:return:
"""
wf_net_conf = WorkFlowNetConfWdnn(key)
self.wf_state_id = wf_net_conf.get_state_id(key).pk
netconfig = wf_net_conf.get_view_obj(key)
self.model_path = wf_net_conf.model_path
self.hidden_layers = wf_net_conf.hidden_layers
self.activation_function = wf_net_conf.activation_function
self.batch_size = wf_net_conf.batch_size
self.epoch = wf_net_conf.epoch
self.model_type = wf_net_conf.model_type
#Todo 어떻게 꺼내는지 승우씨한테 물어볼것
_wf_data_conf = wf_data_conf(key.split('_')[0]+'_'+key.split('_')[1]+'_'+'dataconf_node')
self.data_conf = _wf_data_conf.conf
self.label = _wf_data_conf.label
self.cell_feature = _wf_data_conf.cell_feature
self.cross_cell = _wf_data_conf.cross_cell
self.extend_cell_feature = _wf_data_conf.extend_cell_feature
self.label_values = _wf_data_conf.label_values
if 'test' in self.get_prev_node()[0].node_name:
_wf_data_node = wf_data_node(key.split('_')[0] + '_' + key.split('_')[1] + '_' + 'data_node')
self.multi_read_flag = _wf_data_node.multi_node_flag
else:
_wf_data_node = wf_data_node(key.split('_')[0] + '_' + key.split('_')[1] + '_' + 'data_node')
self.multi_read_flag = _wf_data_node.multi_node_flag
#_wf_data_node = wf_data_node(key.split('_')[0] + '_' + key.split('_')[1] + '_' + 'data_node')
#self.multi_read_flag = _wf_data_node.multi_node_flag
[docs] def eval(self, node_id, conf_data, data=None, result=None):
"""
:param node_id:
:param parm:
:return:
"""
logging.info("eval_data")
self._init_node_parm(node_id.split('_')[0] + "_" + node_id.split('_')[1]+ "_" + "netconf_node")
self.cls_pool_all = conf_data['cls_pool'] # Data feeder
config = {"type": self.model_type, "labels": self.label_values, "nn_id":conf_data.get('nn_id'), "nn_wf_ver_id":conf_data.get('wf_ver')}
train = TrainSummaryInfo(conf=config)
print(config)
self.batch = self.get_eval_batch(node_id)
#print(train)
self.model_eval_path = ''.join([self.model_path + '/' + self.batch])
for _k, _v in self.cls_pool_all.items():
if 'test' in _k:
self.cls_pool = _v
if 'evaldata' in _k:
self.multi_node_flag = _v.multi_node_flag
#conf_data['cls_pool'].get('nn00001_1_pre_feed_fr2wdnn_test')
print("model_path : " + str(self.model_path))
print("hidden_layers : " + str(self.hidden_layers))
print("activation_function : " + str(self.activation_function))
print("batch_size : " + str(self.batch_size))
print("epoch : " + str(self.epoch))
print("model_type : " + str(self.model_type))
# data_store_path = WorkFlowDataFrame(conf_data['nn_id']+"_"+conf_data['wf_ver']+"_"+ "data_node").step_store
data_conf_info = self.data_conf
# make wide & deep model
wdnn = NeuralCommonWdnn()
wdnn_model = wdnn.wdnn_build(self.model_type, conf_data['node_id'], self.hidden_layers,
str(self.activation_function), data_conf_info, str(self.model_eval_path))
# feed
# TODO file이 여러개면 어떻하지?
# get prev node for load data
#data_node_name = self._get_backward_node_with_type(conf_data['node_id'], 'preprocess')
#train_data_set = self.cls_pool[data_node_name[0]] # get filename
train_data_set = self.cls_pool # get filename
file_queue = str(train_data_set.input_paths[0]) # get file_name
# file을 돌면서 최대 Row를 전부 들고 옴 tfrecord 총 record갯수 가져오는 방법필요
_batch_size = self.batch_size
_num_tfrecords_files = 0
# multi Feeder modified
multi_read_flag = self.multi_read_flag
# Todo H5
# train per files in folder h5용
# if multi_file flag = no이면 기본이 h5임
try:
results = dict()
ori_list = list()
pre_list = list()
while (train_data_set.has_next()):
print("h5")
# 파일이 하나 돌때마다
# for 배치사이즈와 파일의 총갯수를 가져다가 돌린다. -> 마지막에 뭐가 있을지 구분한다.
# 파일에 iter를 넣으면 배치만큼 가져오는 fn이 있음 그걸 __itemd에 넣고
# Input 펑션에서 multi를 vk판단해서 col와 ca를 구분한다.(이걸 배치마다 할 필요가 있나?)
# -> 그러면서 피팅
#
# # Iteration is to improve for Model Accuracy
# Per Line in file
# eval should be one line predict
#self.batch_size = 2
for i in range(0, train_data_set.data_size(), self.batch_size):
data_set = train_data_set[i:i + self.batch_size]
#if i == 0:
#eval_data_Set = data_set
# input_fn2(self, mode, data_file, df, nnid, dataconf):
predict_value = wdnn_model.predict(
input_fn=lambda: train_data_set.input_fn2(tf.contrib.learn.ModeKeys.TRAIN, file_queue,
data_set, data_conf_info))
data_set_count = len(data_set.index)
predict_val_list = [_pv for _pv in predict_value]
predict_val_count = len(predict_val_list)
if (data_set_count != predict_val_count):
logging.error("wdnn eval error check : dataframe count({0}) predict count({1})".format(data_set_count, predict_val_count))
raise ValueError(
'eval data validation check error : dataframe and predict count is different(neuralnet_node_wdnn.eval)')
data_set['predict_label'] = predict_val_list #list(predict_value)
#_predict = list(predict_value)
predict_y = list(data_set['predict_label'])
ori_list.extend(data_set[self.label].values.tolist())
pre_list.extend(list(data_set['predict_label']))
# model fitting
print(len(ori_list))
print(len(pre_list))
#logging.error("wdnn eval ori list : {0}".format(ori_list) )
logging.info("wdnn eval ori list : {0}".format(len(ori_list)) )
#logging.info("wdnn eval ori list : {0}".format('info'))
#logging.debug("wdnn eval ori list : {0}".format('debug'))
#logging.critical("wdnn eval ori list : {0}".format('critical'))
#print("model fitting h5 " + str(data_set))
# #Select Next file
train_data_set.next()
#TODO : 앞으로 옮기자
train.set_nn_batch_ver_id(self.batch)
if self.model_type == "regression":
results['ori'] = ori_list
results['pre'] = pre_list
train.set_result_info(ori_list, pre_list)
if self.model_type == "category":
# tfrecord는 여기서 Label을 변경한다. 나중에 꺼낼때 답이 없음 Tensor 객체로 추출되기 때문에 그러나 H5는 feeder에서 변환해주자
le = LabelEncoder()
le.fit(self.label_values)
for _i, _ori in enumerate(ori_list):
#return_value = self.labels[np.argmax(model.predict(X_train))]
train.set_result_info(str(_ori), str(le.inverse_transform(pre_list[_i])))
#return self.batch
except Exception as e:
print("eval error")
print(e)
raise Exception(e)
logging.info("eval end")
return train