Source code for cluster.automl.automl_runmanager

from master.automl.automl import AutoMlCommon
from master.network.nn_common_manager import NNCommonManager
from master.workflow.init.workflow_init_simple import WorkFlowSimpleManager
from master.workflow.common.workflow_common import WorkFlowCommon
import random, logging, copy
from django.db import connection
from common.utils import *
from cluster.service.service_train_task import train
from celery import group
from celery.task.control import inspect

[docs]def automl_run(nnid) : AutoMlRunManager(nnid).run()
[docs]class AutoMlRunManager : """ automl runmanager with handles from create net ver, batch ver """ def __init__(self, nn_id): """ initialize parms need to run auto mode :return: """ self.nn_id = nn_id self.auto_ml_info = AutoMlCommon(nn_id) self.conf_info = self.auto_ml_info.conf_info self.parm_info = self.auto_ml_info.parm_info self.stat_info = self.auto_ml_info.stat_info self.net_type = self.auto_ml_info.net_type self.summary = {} self.summary['bygen'] = [] self.summary['best'] = [] self.debug_mode = False
[docs] def run(self): """ run automl :return: """ try : # get genetic algorithm hyper parameters generation = self.parm_info.get('generation') population = self.parm_info.get('population') survive = self.parm_info.get('survive') # define gene list networks = [] # iterate generation for idx in range(generation) : # create new generations if(idx == 0) : gen_nets, ver_data_sets = self.create_networks(idx, population) networks = networks + gen_nets AutoMlCommon().reset_stat_obj(self.nn_id) else : gen_nets, ver_data_sets = self.create_networks(idx, population - survive) networks = networks + gen_nets # train & evaluate networks networks = self.train_networks(networks) # set each train set flag to fin self.set_train_finish(ver_data_sets) # update traing progress self.update_summary(networks, survive) # sort & discard networks = self.discard_inferior(networks, survive) return networks except Exception as e : logging.error("Error on running AutoML alogorithm : {0}".format(e))
[docs] def update_summary(self, networks, survive): """ update summary info of processing genetic algorithm arranged by generation (sequece of list means number of generation each node contains extra info like survive or not :param networks: networks for one generations :param survive: number of gene to survive :return: dict type result info with extra flag """ networks = sorted(networks, key=lambda x: x.get('acc'), reverse=True) result = list(map(lambda x : self.set_value(x, 'survive', True) , networks[0:survive])) self.summary['best'] = result result = result + list(map(lambda x : self.set_value(x, 'survive', False) , networks[survive:])) self.summary['bygen'].append(result) AutoMlCommon().update_stat_obj(self.nn_id, self.summary)
[docs] def set_value(self, data_set, key, value): data_set[key] = value return data_set
[docs] def save_summary(self, info): """ save best survived results :return: """ self.summary['best'] = info AutoMlCommon().update_stat_obj(self.nn_id, self.summary)
[docs] def discard_inferior(self, networks, survive): """ discard inferior genes combinations :param networks: network lists :return: networks """ networks = sorted(networks, key=lambda x : x.get('acc'), reverse=True) return networks[0:survive]
[docs] def train_networks(self, networks): """ train each networks on cluster server :param networks: network lists :return: networks """ try : tasks = [] #i = inspect() #if (i.active() == None): if (self.debug_mode): # for debug you can run all tasks on django process for network in networks: if(network['flag'] == True ) : continue result = train(network.get('nn_id'), str(network.get('nn_wf_ver_id'))) key = '_'.join([network['nn_id'], str(network['nn_wf_ver_id'])]) network['acc'] = result[key].get('accuracy') network['flag'] = True else : # You can use cluster servers for faster hyper parameter searching # using cluster server with celery for genetic algorithm for network in networks : if (network['flag'] == True): continue tasks.append(train.subtask((network.get('nn_id'), str(network.get('nn_wf_ver_id'))))) results = group(tasks).apply_async() results = results.join() for result in results : for network in networks : key = '_'.join([network['nn_id'], str(network['nn_wf_ver_id'])]) if(key in list(result.keys()) and result[key] is not None and result[key].get('accuracy') is not None) : network['acc'] = result[key].get('accuracy') network['flag'] = True return networks except Exception as e : logging.error("Error on training : {0} ".format(e))
[docs] def create_networks(self, generation, number): """ We need to create new network for evluate our hyperparameter :param generation:number of generation :param population:number of population for each generation :return:return list of network (nn_id and version) """ try : networks = [] ver_data_sets = [] for idx in range(number) : # (1) create version nn_wf_ver_id, ver_data_set = self.create_version(str(generation)) ver_data_sets.append(ver_data_set) # (2) create state & graph flow WorkFlowSimpleManager().create_workflow(self.nn_id, nn_wf_ver_id, self.net_type) all_node_list = self.get_all_nodes_list(self.nn_id, nn_wf_ver_id) # (3) generate conf format for new train & set netconf node node_confs = self._generate_random_case(copy.deepcopy(self.conf_info)) # (4) set node params for node in all_node_list: node_name = node.get('nn_wf_node_name') if(node_name in node_confs) : WorkFlowCommon().set_view_obj('_'.join([str(self.nn_id), str(nn_wf_ver_id), node_name]), node_confs[node_name]) if (node.get('wf_task_menu_id_id') == "data") : update_data = {} update_data['source_path'] = get_source_path(str(self.nn_id), "common", node_name) update_data['store_path'] = get_store_path(str(self.nn_id), "common", node_name) WorkFlowCommon().update_view_obj('_'.join([str(self.nn_id), str(nn_wf_ver_id), node_name]), update_data) if (node.get('wf_task_menu_id_id') == "eval") : update_data = {} update_data['source_path'] = get_source_path(str(self.nn_id), "common", node_name) update_data['store_path'] = get_store_path(str(self.nn_id), "common", node_name) WorkFlowCommon().update_view_obj('_'.join([str(self.nn_id), str(nn_wf_ver_id), node_name]), update_data) if (node.get('wf_task_menu_id_id') == "netconf") : update_data = {} update_data['model_path'] = get_model_path(str(self.nn_id), str(nn_wf_ver_id), node_name) update_data['modelpath'] = get_model_path(str(self.nn_id), str(nn_wf_ver_id), node_name) WorkFlowCommon().update_view_obj('_'.join([str(self.nn_id), str(nn_wf_ver_id), node_name]), update_data) # (5) return summary result netdata = {} netdata['nn_id'] = self.nn_id netdata['generation'] = generation netdata['nn_wf_ver_id'] = nn_wf_ver_id netdata['acc'] = 0.0 netdata['flag'] = False netdata['survive'] = True networks.append(netdata) return networks, ver_data_sets except Exception as e : logging.error("Error AutoML create network : {0}".format(e))
[docs] def mutation(self, networks, number): """ mutate gene with survived ones :param networks: networks info :param number: number of genes to generate :return: networks """ pass
[docs] def create_version(self, generation): """ create version on database :param generation: :return: """ # Create Version nnCommonManager = NNCommonManager() nn_wf_ver_id = nnCommonManager.get_nn_max_ver(self.nn_id) + 1 input_data = {} input_data['nn_wf_ver_id'] = nn_wf_ver_id input_data['nn_def_list_info_nn_id'] = generation input_data['automl_gen'] = generation input_data['condition'] = "2" # 1 Pending, 2 Progress, 3 Finish, 4 Error input_data['active_flag'] = "N" input_data['nn_wf_ver_desc'] = " ".join(["id:",self.nn_id, "gen:", generation, "ver:", str(nn_wf_ver_id)]) input_data['nn_id'] = self.nn_id # Net Version create nnCommonManager.insert_nn_wf_info(input_data) return nn_wf_ver_id, input_data
[docs] def set_train_finish(self, ver_data_sets): """ update state flag for version level info on database :param input_data : raw info we had on database :return:version info """ try : for input_data in ver_data_sets : # Create Version nnCommonManager = NNCommonManager() del input_data['nn_id'] input_data['condition'] = "3" # 1 Pending, 2 Progress, 3 Finish, 4 Error # Net Version create nnCommonManager.update_nn_wf_info(self.nn_id, input_data) return True except Exception as e : return False
def _generate_random_case(self, conf_info): """ generate random case for train :param auto_ml_info: auto ml conf json format :return: json form for neural network """ try : if (type(conf_info) == dict) : row = self._find_leaf(conf_info) if(row is None): return conf_info if(self._check_format(conf_info[row])) : conf_info[row] = self._format_conv(conf_info[row]) return conf_info else : val = self._generate_random_case(conf_info[row]) conf_info[row] = val return self._generate_random_case(conf_info) elif (type(conf_info) == list) : i, row = self._find_leaf(conf_info) if (row is None): return conf_info if (self._check_format(row)): conf_info[i] = self._format_conv(row) return conf_info else: val = self._generate_random_case(row) conf_info[i] = val return self._generate_random_case(conf_info) else : return conf_info except Exception as e : raise Exception ("error on automl generate random case : {0}".format(e)) def _find_leaf(self, conf_info): """ :param conf_info: :return: """ if (type(conf_info) == dict): for row in list(conf_info.keys()): if (str(conf_info[row]).find("auto") < 0) : continue if (type(conf_info[row]) in [list]): return row if(conf_info[row] is None or type(conf_info[row]) not in [dict]): continue if (type(conf_info[row]) in [dict] or conf_info[row].get("auto") is not None): return row return None elif (type(conf_info) == list): for i, row in enumerate(conf_info): if (str(row).find("auto") < 0) : continue if (row is None): continue if (type(row) in [dict] or row.get("auto") is not None): return i, row return None, None else : raise Exception ("_find_leaf error") def _check_format(self, auto_form): """ convert auto format which include automl flag and ranges to real conf data :param auto_form: :return: """ if (type(auto_form) is not dict or auto_form.get("auto") == None) : return False else : return True def _format_conv(self, auto_form): """ convert auto format which include automl flag and ranges to real conf data :param auto_form: :return: """ try : if(auto_form.get('auto') == False) : if(type(auto_form.get('option')) == list) : return auto_form.get('option') elif(type(auto_form.get('option')) == str) : return auto_form.get('option') elif (type(auto_form.get('option')) == int): return auto_form.get('option') else : if(auto_form.get('option') == None) : st, en, ir = auto_form.get('auto') if(type(st) == float or type(en) == float or type(ir) == float) : return random.uniform(st, en) else : if(en > st) : return random.randrange(st, en, ir) else : return random.randrange(en, st, ir) elif(type(auto_form.get('option')) == list) : st, en, ir = auto_form.get('auto') num = random.randrange(st, en, ir) return auto_form.get('option')[num] else : return auto_form.get('option') except Exception as e : raise Exception ("error on automl format conv : {0}".format(e))
[docs] def get_all_nodes_list(self, nn_id, wf_ver): """ get execute class path :param node_id: :return: """ # make query string (use raw query only when cate is too complicated) try: query_list = [] query_list.append("SELECT ND.nn_wf_node_id, ND.wf_task_submenu_id_id, SB.wf_task_menu_id_id, ND.nn_wf_node_name ") query_list.append("FROM master_NN_WF_NODE_INFO ND JOIN master_WF_TASK_SUBMENU_RULE SB ") query_list.append(" ON ND.wf_task_submenu_id_id = SB.wf_task_submenu_id ") query_list.append("WHERE ND.wf_state_id_id = %s") # parm_list : set parm value as list parm_list = [] parm_list.append(str(nn_id) + "_" + str(wf_ver)) with connection.cursor() as cursor: cursor.execute(''.join(query_list), parm_list) row = dictfetchall(cursor) return row except Exception as e: raise Exception(e)