Source code for cluster.neuralnet.neuralnet_node_residual

from cluster.neuralnet.neuralnet_node import NeuralNetNode
from common.utils import *
from master.workflow.netconf.workflow_netconf_cnn import WorkFlowNetConfCNN
from master.workflow.init.workflow_init_simple import WorkFlowSimpleManager
import tensorflow as tf
import numpy as np
import os
import operator
import datetime, logging
from cluster.common.train_summary_info import TrainSummaryInfo
import keras
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ReduceLROnPlateau, CSVLogger, EarlyStopping
from cluster.neuralnet import resnet
from common.graph.nn_graph_manager import NeuralNetModel
from cluster.common.train_summary_accloss_info import TrainSummaryAccLossInfo

[docs]class History(keras.callbacks.Callback):
[docs] def on_train_begin(self, logs={}): self.losses = [] self.acc = []
[docs] def on_batch_end(self, batch, logs={}): self.losses.append(str(logs.get('loss'))) self.acc.append(str(logs.get('acc')))
[docs]class NeuralNetNodeReNet(NeuralNetNode): """ """ def _init_train_parm(self, conf_data): # get initial value self.conf_data = conf_data self.cls_pool = conf_data["cls_pool"] self.nn_id = conf_data["nn_id"] self.wf_ver = conf_data["wf_ver"] self.node_id = conf_data["node_id"] self.node = WorkFlowSimpleManager().get_train_node() # get feed name self.train_feed_name = self.nn_id + "_" + self.wf_ver + "_" + WorkFlowSimpleManager().get_train_feed_node() self.eval_feed_name = self.nn_id + "_" + self.wf_ver + "_" + WorkFlowSimpleManager().get_eval_feed_node() self.feed_node = self.get_prev_node() def _init_value(self): self.g_train_cnt = 0 self.file_end = '.bin' self.train_return_data = {} self.train_return_arr = ["Trainning .................................................."] #################################################################################################################### def _set_netconf_parm(self): netconf = WorkFlowNetConfCNN().get_view_obj(self.node_id) try: netconf = WorkFlowNetConfCNN().set_num_classes_predcnt(self.nn_id, self.wf_ver, self.node, self.node_id, netconf) except: None self.netconf = netconf try: self.train_cnt = self.netconf["param"]["traincnt"] self.epoch = self.netconf["param"]["epoch"] self.batch_size = self.netconf["param"]["batch_size"] self.model_path = self.netconf["modelpath"] self.modelname = self.netconf["modelname"] except Exception as e: logging.info("NetConf is not exist.") logging.info(e) def _set_dataconf_parm(self, dataconf): self.dataconf = dataconf ####################################################################################################################
[docs] def set_saver_model(self): self.save_path = self.model_path + "/" + str(self.batch) + str(self.file_end) keras.models.save_model(self.model, self.save_path) loss = round(self.loss * 100, 2) accR = round(self.acc * 100, 2) val_loss = round(self.val_loss * 100, 2) val_acc = round(self.val_acc * 100, 2) msg = "Global Step: " + str(self.g_train_cnt) msg += ", Training Loss: " + str(loss) + "%" + ", Training Accuracy: " + str(accR) + "%" msg += ", Test Loss: " + str(val_loss) + "%" + ", Test Accuracy: " + str(val_acc) + "%" logging.info(msg) config = {"nn_id": self.nn_id, "nn_wf_ver_id": self.wf_ver, "nn_batch_ver_id": self.batch} result = TrainSummaryAccLossInfo(config) result.loss_info["loss"] = str(val_loss) result.acc_info["acc"] = str(val_acc) self.save_accloss_info(result) result = [msg] # self.model_file_delete(self.model_path, self.modelname) self.train_return_arr.append(result) self.eval(self.node_id, self.conf_data, None, None)
[docs] def get_model_resnet(self): try : keras.backend.tensorflow_backend.clear_session() self.lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=np.sqrt(0.1), cooldown=0, patience=5, min_lr=0.5e-6) self.early_stopper = EarlyStopping(monitor='val_acc', min_delta=0.001, patience=10) self.csv_logger = CSVLogger('resnet.csv') num_classes = self.netconf["config"]["num_classes"] numoutputs = self.netconf["config"]["layeroutputs"] x_size = self.dataconf["preprocess"]["x_size"] y_size = self.dataconf["preprocess"]["y_size"] channel = self.dataconf["preprocess"]["channel"] optimizer = self.netconf["config"]["optimizer"] filelist = os.listdir(self.model_path) filelist.sort(reverse=True) last_chk_path = self.model_path + "/" + self.load_batch+self.file_end try: self.model = keras.models.load_model(last_chk_path) logging.info("Train Restored checkpoint from:" + last_chk_path) except Exception as e: if numoutputs == 18: self.model = resnet.ResnetBuilder.build_resnet_18((channel, x_size, y_size), num_classes) elif numoutputs == 34: self.model = resnet.ResnetBuilder.build_resnet_34((channel, x_size, y_size), num_classes) elif numoutputs == 50: self.model = resnet.ResnetBuilder.build_resnet_50((channel, x_size, y_size), num_classes) elif numoutputs == 101: self.model = resnet.ResnetBuilder.build_resnet_101((channel, x_size, y_size), num_classes) elif numoutputs == 152: self.model = resnet.ResnetBuilder.build_resnet_152((channel, x_size, y_size), num_classes) elif numoutputs == 200: self.model = resnet.ResnetBuilder.build_resnet_200((channel, x_size, y_size), num_classes) logging.info("None to restore checkpoint. Initializing variables instead." + last_chk_path) logging.info(e) self.model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy']) except Exception as e : logging.error("===Error on Residualnet build model : {0}".format(e))
####################################################################################################################
[docs] def train_run_resnet(self, input_data, test_data): data_augmentation = self.netconf["param"]["augmentation"] try: if data_augmentation == "N" or data_augmentation == "n": logging.info('Not using data augmentation.') else: logging.info('Using real-time data augmentation.') while (input_data.has_next()): data_set = input_data[0:input_data.data_size()] x_batch, y_batch, n_batch = self.get_batch_img_data(data_set, "T") test_set = test_data[0:test_data.data_size()] x_tbatch, y_tbatch, n_tbatch = self.get_batch_img_data(test_set, "T") for i in range(self.train_cnt): if data_augmentation == "N" or data_augmentation == "n": history = self.model.fit(x_batch, y_batch, batch_size=self.batch_size, epochs=self.epoch, validation_data=(x_tbatch, y_tbatch), shuffle=True, callbacks=[self.lr_reducer, self.early_stopper, self.csv_logger]) else: # This will do preprocessing and realtime data augmentation: datagen = ImageDataGenerator( featurewise_center=False, # set input mean to 0 over the dataset samplewise_center=False, # set each sample mean to 0 featurewise_std_normalization=False, # divide inputs by std of the dataset samplewise_std_normalization=False, # divide each input by its std zca_whitening=False, # apply ZCA whitening rotation_range=0, # randomly rotate images in the range (degrees, 0 to 180) width_shift_range=0.1, # randomly shift images horizontally (fraction of total width) height_shift_range=0.1, # randomly shift images vertically (fraction of total height) horizontal_flip=True, # randomly flip images vertical_flip=False) # randomly flip images # Compute quantities required for featurewise normalization # (std, mean, and principal components if ZCA whitening is applied). datagen.fit(x_batch) # Fit the model on the batches generated by datagen.flow(). history = self.model.fit_generator(datagen.flow(x_batch, y_batch, batch_size=self.batch_size), steps_per_epoch=x_batch.shape[0] // self.batch_size, validation_data=(x_tbatch, y_tbatch), epochs=self.epoch, verbose=1, max_q_size=100, callbacks=[self.lr_reducer, self.early_stopper, self.csv_logger]) self.loss = history.history["loss"][0] self.acc = history.history["acc"][0] self.val_loss = history.history["val_loss"][0] self.val_acc = history.history["val_acc"][0] self.g_train_cnt += 1 logging.info("Save Train Count=" + str(self.g_train_cnt)) self.set_saver_model() input_data.next() except Exception as e: logging.info("Error[400] ..............................................") logging.info(e)
[docs] def run(self, conf_data): try : logging.info("run NeuralNetNodeResnet Train") # init data setup self._init_train_parm(conf_data) self._init_value() # get data & dataconf test_data, dataconf = self.get_input_data(self.feed_node, self.cls_pool, self.eval_feed_name) input_data, dataconf = self.get_input_data(self.feed_node, self.cls_pool, self.train_feed_name) # set netconf, dataconf self._set_netconf_parm() self._set_dataconf_parm(dataconf) # set batch self.load_batch = self.get_eval_batch(self.node_id) if self.epoch != 0 and self.train_cnt != 0: self.train_batch, self.batch = self.make_batch(self.node_id) else: self.batch = self.load_batch self.get_model_resnet() self.train_run_resnet(input_data, test_data) self.train_return_data["TrainResult"] = self.train_return_arr if self.epoch == 0 or self.train_cnt == 0: self.eval(self.node_id, self.conf_data, None, None) return self.train_return_data except Exception as e : logging.info("===Error on running residualnet : {0}".format(e))
####################################################################################################################
[docs] def eval_run(self, input_data): self.batch_size = self.netconf["param"]["batch_size"] labels = self.netconf["labels"] pred_cnt = self.netconf["param"]["predictcnt"] try: predlog = self.netconf["param"]["predictlog"] except: predlog = "N" # logging.info(labels) t_cnt_arr = [] f_cnt_arr = [] for i in range(len(labels)): t_cnt_arr.append(0) f_cnt_arr.append(0) input_data.pointer = 0 # eval config = {"type": self.netconf["config"]["eval_type"], "labels": self.netconf["labels"], "nn_id": self.nn_id, "nn_wf_ver_id": self.wf_ver, "nn_batch_ver_id": self.batch} self.eval_data = TrainSummaryInfo(conf=config) while (input_data.has_next()): data_set = input_data[0:input_data.data_size()] x_batch, y_batch, n_batch = self.get_batch_img_data(data_set, "E") try: logits = self.model.predict(x_batch) for i in range(len(logits)): true_name = y_batch[i] file_name = n_batch[i] logit = [] logit.append(logits[i]) # idx = labels.index(true_name) retrun_data = self.set_predict_return_cnn_img(labels, logit, pred_cnt) pred_name = retrun_data["key"][0] if self.eval_flag == "E": if true_name == pred_name: t_cnt_arr[idx] = t_cnt_arr[idx] + 1 strLog = "[True] : " if (predlog == "TT"): logging.info(strLog + true_name + " FileName=" + file_name) logging.info(retrun_data["key"]) logging.info(retrun_data["val"]) else: f_cnt_arr[idx] = f_cnt_arr[idx] + 1 strLog = "[False] : " if (predlog == "FF"): logging.info(strLog + true_name + " FileName=" + file_name) logging.info(retrun_data["key"]) logging.info(retrun_data["val"]) if (predlog == "AA"): logging.info(strLog + true_name + " FileName=" + file_name) logging.info(retrun_data["key"]) logging.info(retrun_data["val"]) else: try: listTF = retrun_data["key"].index(true_name) t_cnt_arr[idx] = t_cnt_arr[idx] + 1 strLog = "[True] : " if (predlog == "T"): logging.info(strLog + true_name + " FileName=" + file_name) logging.info(retrun_data["key"]) logging.info(retrun_data["val"]) except: f_cnt_arr[idx] = f_cnt_arr[idx] + 1 strLog = "[False] : " if (predlog == "F"): logging.info(strLog + true_name + " FileName=" + file_name) logging.info(retrun_data["key"]) logging.info(retrun_data["val"]) if (predlog == "A"): logging.info(strLog + true_name + " FileName=" + file_name) logging.info(retrun_data["key"]) logging.info(retrun_data["val"]) self.eval_data.set_result_info(true_name, pred_name) except Exception as e: logging.info(e) logging.info("None to restore checkpoint. Initializing variables instead.") input_data.next() # set parms for db store input_data = TrainSummaryInfo.save_result_info(self, self.eval_data) self.eval_print(labels, t_cnt_arr, f_cnt_arr)
[docs] def eval_print(self, labels, t_cnt_arr, f_cnt_arr): logging.info( "####################################################################################################") result = [] strResult = "['Eval ......................................................']" result.append(strResult) totCnt = 0 tCnt = 0 fCnt = 0 for i in range(len(labels)): strResult = "Category : " + self.spaceprint(labels[i], 15) + " " strResult += "TotalCnt=" + self.spaceprint(str(t_cnt_arr[i] + f_cnt_arr[i]), 8) + " " strResult += "TrueCnt=" + self.spaceprint(str(t_cnt_arr[i]), 8) + " " strResult += "FalseCnt=" + self.spaceprint(str(f_cnt_arr[i]), 8) + " " if t_cnt_arr[i] + f_cnt_arr[i] != 0: strResult += "True Percent(TrueCnt/TotalCnt*100)=" + str( round(t_cnt_arr[i] / (t_cnt_arr[i] + f_cnt_arr[i]) * 100)) + "%" totCnt += t_cnt_arr[i] + f_cnt_arr[i] tCnt += t_cnt_arr[i] fCnt += f_cnt_arr[i] logging.info(strResult) result.append(strResult) strResult = "---------------------------------------------------------------------------------------------------" logging.info(strResult) strResult = "Total Category=" + self.spaceprint(str(len(labels)), 11) + " " strResult += "TotalCnt=" + self.spaceprint(str(totCnt), 8) + " " strResult += "TrueCnt=" + self.spaceprint(str(tCnt), 8) + " " strResult += "FalseCnt=" + self.spaceprint(str(fCnt), 8) + " " if totCnt != 0: strResult += "True Percent(TrueCnt/TotalCnt*100)=" + str(round(tCnt / totCnt * 100)) + "%" logging.info(strResult) result.append(strResult) logging.info( "###################################################################################################")
[docs] def eval(self, node_id, conf_data, data=None, result=None): logging.info("run NeuralNetNodeCnn eval") if data == None: self.eval_flag = "T" else: self.eval_flag = "E" # get data & dataconf test_data, dataconf = self.get_input_data(self.feed_node, self.cls_pool, self.eval_feed_name) self.eval_run(test_data) return self.eval_data
####################################################################################################################
[docs] def predict(self, node_id, filelist): """ """ logging.info("run NeuralNetNodeCnn Predict") self.node_id = node_id self._init_value() # net, data config setup data_node_name = self._get_backward_node_with_type(node_id, 'data') dataconf = WorkFlowNetConfCNN().get_view_obj(data_node_name[0]) self._set_netconf_parm() self._set_dataconf_parm(dataconf) # data shape change MultiValuDict -> nd array filename_arr, filedata_arr = self.change_predict_fileList(filelist, dataconf) # get unique key self.load_batch = self.get_active_batch(self.node_id) unique_key = '_'.join([node_id, self.load_batch]) logging.info("getModelPath:"+self.model_path + "/" + self.load_batch+self.file_end) ## create tensorflow graph if (NeuralNetModel.dict.get(unique_key)): self = NeuralNetModel.dict.get(unique_key) graph = NeuralNetModel.graph.get(unique_key) else: self.get_model_resnet() NeuralNetModel.dict[unique_key] = self NeuralNetModel.graph[unique_key] = tf.get_default_graph() graph = tf.get_default_graph() pred_return_data = {} for i in range(len(filename_arr)): file_name = filename_arr[i] file_data = filedata_arr[i] logits = self.model.predict(file_data) labels = self.netconf["labels"] pred_cnt = self.netconf["param"]["predictcnt"] retrun_data = self.set_predict_return_cnn_img(labels, logits, pred_cnt) pred_return_data[file_name] = retrun_data logging.info("Return Data.......................................") logging.info(pred_return_data) return pred_return_data