Source code for master.workflow.data.workflow_data_text

from master.workflow.data.workflow_data import WorkFlowData
from common import utils
from master import models
import os

[docs]class WorkFlowDataText(WorkFlowData) : """ 1. Definition Reuse already saved as HDF5 format 2. Tables NN_WF_NODE_INFO (NODE_CONFIG_DATA : Json Field) """ def __init__(self, key = None): """ init key variable :param key: :return: """ self.key = key
[docs] def get_preview_data(self): """ :param type: :param conn: :return: """ return None
[docs] def set_preview_data(self): """ :param type: :param conn: :return: """ return None
[docs] def get_step_source(self): """ getter for source step :return:obj(json) to make view """ try: obj = models.NN_WF_NODE_INFO.objects.get(nn_wf_node_id=self.key) config_data = getattr(obj, 'node_config_data') return config_data except Exception as e: raise Exception(e)
[docs] def get_sql_stmt(self): """ :param nnid: :param wfver: :param node: :return: """ if('conf' not in self.__dict__) : self.conf = self.get_step_source() return self.conf.get('source_sql')
[docs] def get_src_type(self): """ :param nnid: :param wfver: :param node: :return: """ if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf.get('source_type')
[docs] def get_src_server(self): """ :param nnid: :param wfver: :param node: :return: """ if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf.get('source_server')
[docs] def get_parse_type(self): """ :param nnid: :param wfver: :param node: :return: """ if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf.get('source_parse_type')
[docs] def get_source_path(self): """ :param nnid: :param wfver: :param node: :return: """ if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf.get('source_path')
[docs] def check_step_source(self, obj): """ check step_source process fit to requirement :param obj: config data from view :return:boolean """ error_msg = "" if ('source_type' not in obj): error_msg = ''.join([error_msg, 'source_type (local, hadoop, s3, rdb) not defined']) if(obj['source_type'] == 'local') : if ('max_sentence_len' not in obj): error_msg = ''.join([error_msg, 'max_sentence_len (inteager length of sent) not defined']) if ('source_parse_type' not in obj): error_msg = ''.join([error_msg, 'source_parse_type (raw, excel) not defined']) if (obj['source_type'] in ['rdb', 'hbase', 's3']): if ('source_parse_type' not in obj): error_msg = ''.join([error_msg, 'source_parse_type (raw, excel) not defined']) if ('source_server' not in obj): error_msg = ''.join([error_msg, 'source_server (local or server management id) not defined']) if ('source_sql' not in obj): error_msg = ''.join([error_msg, 'source_sql (query string) not defined']) if ('source_path' not in obj): error_msg = ''.join([error_msg, 'source_path (source path string) not defined']) if ('max_sentence_len' not in obj): error_msg = ''.join([error_msg, 'max_sentence_len (inteager length of sent) not defined']) if (error_msg == ""): return True else: raise Exception(error_msg)
[docs] def put_step_source(self, src, form, nnid, wfver, node, input_data): """ putter for source step :param obj: config data from view :return:boolean """ try: if(src == 'local') : source_path = utils.get_source_path(nnid, wfver, node) return self.update_wf_node_info(src, form, nnid, wfver, node, input_data, source_path) except Exception as e: raise Exception(e)
[docs] def update_wf_node_info(self, src, form, nnid, wfver, node, input_data, source_path): """ :param src: :param form: :param nnid: :param wfver: :param node: :param input_data: :return: """ obj = models.NN_WF_NODE_INFO.objects.get(nn_wf_node_id=str(nnid) + "_" + str(wfver) + "_" + str(node)) config_data = getattr(obj, 'node_config_data') config_data['source_type'] = src config_data['source_parse_type'] = form config_data['source_server'] = input_data['source_server'] config_data['source_sql'] = input_data['source_sql'] config_data['source_path'] = source_path config_data['max_sentence_len'] = input_data['max_sentence_len'] setattr(obj, 'node_config_data', config_data) obj.save() if (os.path.exists(source_path) == False): os.makedirs(source_path, exist_ok=True) return config_data
[docs] def get_step_preprocess(self): """ getter for preprocess :return:obj(json) to make view """ try: if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf['preprocess'] except Exception as e: raise Exception(e)
[docs] def check_step_preprocess(self, obj): """ :param obj: :return: """ error_msg = "" if ('preprocess' not in obj): error_msg = ''.join([error_msg, 'preprocess type (mecab, kkma) not defined']) if (error_msg == ""): return True else: raise Exception(error_msg)
[docs] def put_step_preprocess(self, src, form, nnid, wfver, node, input_data): """ putter for preprocess :param obj: config data from view :return:boolean """ try: self.check_step_preprocess(input_data) obj = models.NN_WF_NODE_INFO.objects.get(nn_wf_node_id=str(nnid) + "_" + str(wfver) + "_" + str(node)) config_data = getattr(obj, 'node_config_data') config_data['preprocess'] = input_data['preprocess'] setattr(obj, 'node_config_data', config_data) obj.save() return input_data['preprocess'] except Exception as e: raise Exception(e)
[docs] def get_step_store(self): """ getter for store :return:obj(json) to make view """ try: if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf['store_path'] except Exception as e: raise Exception(e)
[docs] def check_step_store(self, obj): """ :param obj: :return: """ return True
[docs] def put_step_store(self, src, form, nnid, wfver, node, input_data): """ putter for store :param obj: config data from view :return:boolean """ try: self.check_step_store(input_data) store_path = utils.get_store_path(nnid, wfver, node) obj = models.NN_WF_NODE_INFO.objects.get(nn_wf_node_id=str(nnid) + "_" + str(wfver) + "_" + str(node)) config_data = getattr(obj, 'node_config_data') config_data['store_path'] = utils.get_store_path(nnid, wfver, node) setattr(obj, 'node_config_data', config_data) obj.save() if (os.path.exists(store_path) == False): os.makedirs(store_path, exist_ok=True) return config_data['store_path'] except Exception as e: raise Exception(e)
[docs] def get_max_sent_len(self): """ getter for store :return:obj(json) to make view """ try: if ('conf' in self.__dict__): self.conf = self.get_step_source() return self.conf['max_sentence_len'] except Exception as e: raise Exception(e)