Source code for cluster.service.service_train_task

from __future__ import absolute_import, unicode_literals
from django.db import connection
from celery import shared_task
from master import models
from cluster.common.common_node import WorkFlowCommonNode
from common.utils import *
import logging
from logging import FileHandler
from django.conf import settings
import datetime


@shared_task
def train(nn_id, wf_ver) :

    log_home = "/root"
    _celery_log_dir = make_celery_dir_by_datetime(log_home)
    celery_log_dir = make_and_exist_directory(_celery_log_dir)

    celery_log_file = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    _filename = str(nn_id) +"_" +str(wf_ver) +"_" +  celery_log_file + ".log"

    celery_log_file = celery_log_dir + _filename

    logging.config.dictConfig(settings.LOGGING)
    logger = logging.getLogger()
    task_handler = FileHandler(celery_log_file)
    logger.addHandler(task_handler)
    logging.info("=============================================================")
    logging.info("[Train Task] Start Celery Job {0} {1}".format(nn_id, wf_ver))
    logging.info("=============================================================")
    result = WorkFlowTrainTask()._exec_train(nn_id, wf_ver)
    logging.info("=============================================================")
    logging.info("[Train Task] Done Celery Job {0} {1} : {2}".format(nn_id, wf_ver, result))
    logging.info("=============================================================")
    return result

[docs]def make_celery_dir_by_datetime(home_dir): """ 현재시간을 사용하여 디렉토리 이름 반환 Make Datetime directory by datetime functions Args: params: Returns: directory name using datetime Raises: Example """ celery_log_dir = datetime.datetime.now().strftime('%Y-%m-%d') celery_work_dir = home_dir + "/"+ celery_log_dir + "/" return celery_work_dir
[docs]class WorkFlowTrainTask(WorkFlowCommonNode): """ """ def _exec_train(self, nn_id, wf_ver): """ start train with predefined workflow process :param nn_id: :param wf_ver: :return: """ try : self.nn_id = nn_id self.wf_ver = wf_ver # get node sequece list node_list = self._get_all_nodes_list() if(len(node_list) == 0) : return None cls_list = self._get_cached_class_pool(node_list) node_rel_list = self._get_node_rel_data() node_graph_set = self._create_node_graph(node_list, cls_list, node_rel_list) first_node = self._find_first_node(node_graph_set) # execute nodes by sequece result_info = {} while True : search_info = self._search_next_node(first_node) if(search_info[0] == False) : break first_node = search_info[1] if(search_info[2] != None) : _relation = self._get_node_relation(nn_id, wf_ver, search_info[2].get_node_name()) conf_data = {} conf_data['node_id'] = search_info[2].get_node_name() conf_data['node_list'] = node_list conf_data['node_prev'] = _relation['prev'] conf_data['node_next'] = _relation['next'] conf_data['nn_id'] = nn_id conf_data['wf_ver'] = wf_ver conf_data['cls_pool'] = cls_list logging.info("[Node Start] {0}".format(search_info[2].node_name)) key = '_'.join([search_info[2].net_id, search_info[2].net_ver]) result_info[key] = search_info[2].run(conf_data) logging.info("[Node End] {0}".format(search_info[2].node_name)) return result_info except Exception as e : logging.error("Error on node [{0}][{1}]".format(search_info[2].node_name, e)) raise Exception (e) def _search_next_node(self, graph_node): """ :param linked_list: :return: """ try : stop_flag = True next_task = None current_Task = None if(graph_node == None) : return False, next_task, current_Task if(graph_node.get_search_flag() == False and graph_node.check_prev() == -1) : graph_node.set_search_flag() current_Task = graph_node if(graph_node.check_prev() != -1) : next_task = graph_node.get_prev_node_as_dict()[graph_node.check_prev()] elif(graph_node.check_next() != -1): next_task = graph_node.get_next_node_as_dict()[graph_node.check_next()] return stop_flag, next_task, current_Task except Exception as e : raise Exception ("seach net node error") def _find_first_node(self, linked_list): """ get first node from list :param linked_list: :return: """ while(len(linked_list.get_prev_node()) > 0 ) : linked_list = linked_list.get_prev_node()[0] return linked_list def _get_all_nodes_list(self): """ get execute class path :param node_id: :return: """ # make query string (use raw query only when cate is too complicated) try: query_list = [] query_list.append("SELECT ND.nn_wf_node_id, ND.wf_task_submenu_id_id, SB.wf_task_menu_id_id, ND.nn_wf_node_name ") query_list.append("FROM master_NN_WF_NODE_INFO ND JOIN master_WF_TASK_SUBMENU_RULE SB ") query_list.append(" ON ND.wf_task_submenu_id_id = SB.wf_task_submenu_id ") query_list.append("WHERE ND.wf_state_id_id = %s") # parm_list : set parm value as list parm_list = [] parm_list.append(str(self.nn_id) + "_" + str(self.wf_ver)) with connection.cursor() as cursor: cursor.execute(''.join(query_list), parm_list) row = dictfetchall(cursor) return row #return row[0]['nn_wf_node_id'], row[0]['wf_task_submenu_id_id'], row[0]['wf_task_menu_id_id'] except Exception as e: raise Exception(e) def _get_cached_class_pool(self, node_name_list): """ :param node_name_list: :return: """ try : class_list = {} for i in range(len(node_name_list)) : _path, _cls = self.get_cluster_exec_class(node_name_list[i]['nn_wf_node_id']) class_list[node_name_list[i]['nn_wf_node_id']] = self.load_class(_path, _cls) return class_list except Exception as e : raise Exception (e) def _get_node_rel_data(self): """ get sequence of nodes to execute :return: """ return_arr = [] query_set = models.NN_WF_NODE_RELATION.objects.filter(wf_state_id=self.nn_id + "_" + self.wf_ver) for data in query_set: return_arr.append([data.nn_wf_node_id_1, data.nn_wf_node_id_2]) return return_arr def _create_node_graph(self, node_list, class_list, node_rel_list): """ get node graph :return: """ try : # set inital node info for node in node_list : cls = class_list[node.get('nn_wf_node_id')] cls.set_node_name(node.get('nn_wf_node_id')) cls.set_node_type(node.get('wf_task_submenu_id_id')) cls.set_node_grp(node.get('wf_task_menu_id_id')) cls.set_node_def(node.get('nn_wf_unique_keynode_name')) cls.set_net_node_id(node.get('nn_wf_node_name')) cls.set_net_ver(self.wf_ver) cls.set_net_id(self.nn_id) for rel_node in node_rel_list : if(node.get('nn_wf_node_id') == rel_node[0]) : cls.set_next_node(rel_node[1], class_list[rel_node[1]]) if(node.get('nn_wf_node_id') == rel_node[1]): cls.set_prev_node(rel_node[0], class_list[rel_node[0]]) class_list[node.get('nn_wf_node_id')] = cls # set inital node info for node in reversed(node_list) : cls = class_list[node.get('nn_wf_node_id')] for rel_node in node_rel_list : if(node.get('nn_wf_node_id') == rel_node[0]) : cls.set_next_node(rel_node[1], class_list[rel_node[1]]) if(node.get('nn_wf_node_id') == rel_node[1]): cls.set_prev_node(rel_node[0], class_list[rel_node[0]]) class_list[node.get('nn_wf_node_id')] = cls return class_list[node_list[0].get('nn_wf_node_id')] except Exception as e : raise Exception (e)