Source code for cluster.service.service_single_task
from __future__ import absolute_import, unicode_literals
import importlib
from celery import shared_task
from cluster.common.common_node import WorkFlowCommonNode
@shared_task
def single_run(nn_id, wf_ver, node) :
print ("[Train Task] Start Celery Job ")
result = WorkFlowSingleTask()._run_single_node(nn_id, wf_ver, node)
return result
[docs]class WorkFlowSingleTask(WorkFlowCommonNode):
def _run_single_node(self, nn_id, wf_ver, node):
"""
run given single node directly and return result
:param obj: nn_id, ver, node and etc
:return:
"""
try:
self.nn_id = nn_id
self.wf_ver = wf_ver
node_id = nn_id + '_' + wf_ver + '_' + node
result_info = []
# execute nodes by sequece
# 밑져야 본전이니 전화 후는 그 들고 있자.
_relation = self._get_node_relation(nn_id, wf_ver, node_id)
_path, _cls = self.get_cluster_exec_class(node_id)
conf_data = {}
conf_data['node_id'] = node_id
conf_data['node_list'] = None
conf_data['node_prev'] = _relation['prev']
conf_data['node_next'] = _relation['next']
conf_data['nn_id'] = nn_id
conf_data['wf_ver'] = wf_ver
result_info.append(self.load_class(_path, _cls).run(conf_data))
return result_info
except Exception as e:
raise Exception(e)