Source code for cluster.data.data_node_frame

from blaze.interactive import data

from cluster.data.data_node import DataNode
import os
from master.workflow.data.workflow_data_frame import WorkFlowDataFrame
from time import gmtime, strftime
import pandas as pd
import tensorflow as tf
import json
from master.workflow.dataconf.workflow_dataconf_frame import WorkflowDataConfFrame as wf_data_conf
from common import utils
from sklearn.preprocessing import LabelEncoder
import logging
from common.utils import *
import shutil
from master.workflow.data.workflow_data_frame import WorkFlowDataFrame as wf_data_frame
from sklearn import preprocessing

[docs]class DataNodeFrame(DataNode): """ DataNode Configuration NULL처리가 중요한데 Category "" Continuous 0.0 """
[docs] def run(self, conf_data): """ Run Data Node 한번에 HDF5랑 TFRECORD를 만든다. :param data_path: :return:dataframe """ self.cls_list = conf_data['cls_pool'] self._init_node_parm(conf_data['node_id']) if(self.data_src_type == 'local' and self.type == "csv") : self.src_local_handler(conf_data) if (self.data_src_type == 'rdb'): raise Exception ("on development now") if (self.data_src_type == 's3'): raise Exception("on development now") if (self.data_src_type == 'hbase'): raise Exception("on development now")
[docs] def get_eval_node_file_list(self, conf_data): """ Eval Data Node 찾고, 경로를 찾아서 CSV를 읽음 self.data_conf에 cell_feature에 넣음 Args: params: * _conf_data : nnid의 wf정보 Returns: None """ eval_data_node = [_i for _i, _k in conf_data.get('cls_pool').items() if 'evaldata' in _i] data_conf_node_id = [_i for _i, _k in conf_data.get('cls_pool').items() if 'dataconf' in _i] eval_data_cls = wf_data_frame(eval_data_node[0]) eval_source_path = eval_data_cls.source_path fp_list = utils.get_filepaths(eval_source_path, file_type='csv') for file_path in fp_list: df_csv_read = self.load_csv_by_pandas(file_path) self.data_conf = self.make_column_types(df_csv_read, eval_data_node[0], data_conf_node_id[0]) # make columns type of csv
[docs] def check_eval_node_for_wdnn(self, _conf_data): """ Eval Data의 Category 데이터를 가져오기 위해서 필요 WDNN이면 data_conf_node_id를 반환 Args: params: * _conf_data : nnid의 wf정보 Returns: data_conf_node_id DataConf의 ID반환 """ data_conf_node_id = '' for _i, _k in self.cls_list.items(): if 'dataconf' in _i: #wdnn만 Dataconf를 가 data_conf_node_id = _i if 'data_node' not in _conf_data['node_id']: # eval 카테고리 데이터를 가져 오기 위해서 필요 Evalnode가 실행할때는 필요 없음 self.get_eval_node_file_list(_conf_data) return data_conf_node_id
[docs] def make_label_values(self, _data_dfconf_list, _df_csv_read): """ label의 Unique Value를 DataConf에 넣어줌 Args: params: * _data_dfconf_list : nnid의 wf정보 * _df_csv_read : Dataframe(train, eval) Returns: _label : label 항목 값 _labe_type : label type """ _key = _data_dfconf_list _nnid = _key.split('_')[0] _ver = _key.split('_')[1] _node = 'dataconf_node' _wf_data_conf = wf_data_conf(_key) if hasattr(_wf_data_conf, 'label') == True: _label = _wf_data_conf.label _labe_type = _wf_data_conf.label_type origin_labels_list = _wf_data_conf.label_values if hasattr(_wf_data_conf, 'label_values') else list() # 처음 입려할때 라벨벨류가 없으면 빈 리스트 넘김 compare_labels_list = self.set_dataconf_for_labels(_df_csv_read, _label) self.combined_label_list = utils.get_combine_label_list(origin_labels_list, compare_labels_list) # 리스트를 합친다음 DB에 업데이트 한다. _data_conf = dict() _data_conf['label_values'] = self.combined_label_list if _labe_type == 'CONTINUOUS': _data_conf['label_values'] = list() _wf_data_conf.put_step_source(_nnid, _ver, _node, _data_conf) return _label, _labe_type
[docs] def make_preprocessing_pandas(self, _df_csv_read_ori, _preprocessing_type , _label): """ SKLearn을 사용해서 Pandas를 Proprocessing label은 Preprocessing 하면 안됨 Args: params: * _preprocessing_type: ['scale', 'minmax_scale', 'robust_scale', 'normalize', 'maxabs_scale'] * _df_csv_read_ori : pandas dataframe * _label Returns: Preprocessing DataFrame """ if _preprocessing_type == None or _preprocessing_type == 'null': logging.info("No Preprocessing") result_df = _df_csv_read_ori else : logging.info("Preprocessing type : {0}".format(_preprocessing_type)) numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] for i, v in _df_csv_read_ori.dtypes.iteritems(): if v in numerics: if i not in _label: #preprocessing_types = ['scale', 'minmax_scale', 'robust_scale', 'normalize', 'maxabs_scale'] #_preprocessing_type = ['maxabs_scale'] if 'scale' in _preprocessing_type: _df_csv_read_ori[i] = preprocessing.scale(_df_csv_read_ori[i].fillna(0.0)) if 'minmax_scale' in _preprocessing_type: _df_csv_read_ori[i] = preprocessing.minmax_scale(_df_csv_read_ori[i].fillna(0.0)) if 'robust_scale' in _preprocessing_type: _df_csv_read_ori[i] = preprocessing.robust_scale(_df_csv_read_ori[i].fillna(0.0)) if 'normalize' in _preprocessing_type: _df_csv_read_ori[i] = preprocessing.normalize(_df_csv_read_ori[i].fillna(0.0)) if 'maxabs_scale' in _preprocessing_type: _df_csv_read_ori[i] = preprocessing.maxabs_scale(_df_csv_read_ori[i].fillna(0.0)) result_df = _df_csv_read_ori return result_df
[docs] def make_drop_duplicate(self, _df_csv_read_ori, _drop_duplicate , _label): """ Label을 제외한 나머지 값중에 중복이 있으면 Row 전체를 제거한다. Args: params: * _preprocessing_type: ['scale', 'minmax_scale', 'robust_scale', 'normalize', 'maxabs_scale'] * _df_csv_read_ori : pandas dataframe * _label Returns: Preprocessing Dataframe """ if _drop_duplicate == None or _drop_duplicate == 'null' or _drop_duplicate == False: logging.info("No Duplicate") result_df = _df_csv_read_ori else : cell_features = _df_csv_read_ori.columns.tolist() cell_features.remove(_label) result_df = _df_csv_read_ori.drop_duplicates(cell_features, keep="first") logging.info("duplicated row delete {0}".format(len(_df_csv_read_ori.index)-len(result_df.index))) temp_duplicate_filename = strftime("%Y-%m-%d-%H:%M:%S", gmtime()) + "_dup.csvbk" result_df.to_csv(self.data_src_path + "/backup/" + temp_duplicate_filename) return result_df
[docs] def src_local_handler(self, conf_data): """ Converting csv to h5 and Tf Record Data Node for Data_frame 1) Wdnn인 경우 Pandas를 파싱하면서 Categorical 인지 Continuous인지 구별하여 DataConf에 입력(eval data할때는 안함. DataNode 기준 ) Category일경우 Unique값을 Dataconf에 입력 Label type이 Categorical이면 Label의 Unique값을 DataConf입력 _preprocess_type에 따라 Pandas 전처리 2) _multi_node_flag 가 True일 경우 TfRecord까지 생성 3) Wdnn이 아닌경우 H5만 생성 Args: params: * conf_data : nn_info Returns: None Raises: """ try: logging.info("Data node starting : {0}".format(conf_data['node_id'])) fp_list = utils.get_filepaths(self.data_src_path, file_type='csv') _multi_node_flag = self.multi_node_flag _preprocess_type = self.data_preprocess_type #_preprocess_type = "maxabs_scale" _drop_duplicate = self.drop_duplicate dir = self.data_src_path + "/backup" # backup 디렉토리 만들고 if not os.path.exists(dir): os.makedirs(dir) try: data_conf_node_id = self.check_eval_node_for_wdnn(conf_data) data_dfconf_list = data_conf_node_id for file_path in fp_list: if len(data_dfconf_list) == 0: #WDNN이 아닌것 df_csv_read = self.load_csv_by_pandas(file_path) self.create_hdf5(self.data_store_path, df_csv_read) if len(data_dfconf_list) > 0: #WDNN인것 df_csv_read = self.load_csv_by_pandas(file_path) if 'dataconf' in data_dfconf_list: #이미 여기서 Dataconf인지 판단 self.data_conf = self.make_column_types(df_csv_read, conf_data['node_id'], data_conf_node_id) # make columns type of csv # eval 것도 같이 가져와서 unique value를 구해야함 # Todo 만약 eval과 train의 데이터 타입이 틀리면 Category로 해야하는 로직이 필요함 _label,_labe_type = self.make_label_values(data_dfconf_list, df_csv_read) # WDNN인 경우 Label Values를 Dataconf에 넣음 drop_dup_df_csv_read = self.make_drop_duplicate(df_csv_read, _drop_duplicate,_label) _pre_df_csv_read = self.make_preprocessing_pandas(drop_dup_df_csv_read, _preprocess_type,_label ) temp_preprocess_filename = strftime("%Y-%m-%d-%H:%M:%S", gmtime()) + "_pre.csvbk" _pre_df_csv_read.to_csv(self.data_src_path + "/backup/" + temp_preprocess_filename) self.create_hdf5(self.data_store_path, _pre_df_csv_read) if _multi_node_flag == True: skip_header = False # Todo Have to remove if production self.save_tfrecord(file_path, self.data_store_path, skip_header, _pre_df_csv_read,_label, _labe_type) file_name_bk = strftime("%Y-%m-%d-%H:%M:%S", gmtime()) + ".csvbk" shutil.copy(file_path,self.data_src_path+"/backup/"+file_name_bk ) os.remove(file_path) #승우씨것 except Exception as e: logging.error("Datanode making h5 or tfrecord error".format(e)) raise Exception(e) logging.info("Data node end : {0}".format(conf_data['node_id'])) return None except Exception as e: raise Exception(e)
[docs] def multi_load_data(self, node_id, parm = 'all'): pass
[docs] def preprocess_data(self, input_data): """ :param input_data: :return: """ if(self.data_preprocess_type == 'mecab'): for key in input_data.keys() : input_data[key] = self._mecab_parse(input_data[key]) return input_data
[docs] def save_tfrecord(self, csv_data_file, store_path, skip_header, df_csv_read, label, label_type): """ Creates a TFRecords file for the given input data and example transofmration function """ filename = strftime("%Y-%m-%d-%H:%M:%S", gmtime()) output_file = store_path +"/"+ filename + ".tfrecords" self.create_tfrecords_file( output_file, skip_header, df_csv_read, label,label_type)
[docs] def create_tfrecords_file(self, output_file, skip_header, df_csv_read, label,label_type): """ Creates a TFRecords file for the given input data and example transofmration function """ try: writer = tf.python_io.TFRecordWriter(output_file) logging.info("Creating TFRecords file at", output_file, "...") CONTINUOUS_COLUMNS, CATEGORICAL_COLUMNS = self.make_continuous_category_list(self.data_conf["cell_feature"]) print_row_count = 10000 csv_dataframe = df_csv_read for count, row in csv_dataframe.iterrows(): x = self.create_example_pandas(row, CONTINUOUS_COLUMNS, CATEGORICAL_COLUMNS, label,label_type) if (count % print_row_count == 0): logging.info("###### TFRecording row count : {0}".format(count)) writer.write(x.SerializeToString()) writer.close() print("Wrote to", output_file) except Exception as e: raise e
[docs] def make_continuous_category_list(self,cell_feature ): """ Example 을 위한 Continuous 랑 Categorical을 구분하기 위한 list """ CONTINUOUS_COLUMNS = list() CATEGORICAL_COLUMNS = list() for type_columne, type_value in cell_feature.items(): if type_value["column_type"] == 'CONTINUOUS': CONTINUOUS_COLUMNS.append(type_columne) else: CATEGORICAL_COLUMNS.append(type_columne) return CONTINUOUS_COLUMNS, CATEGORICAL_COLUMNS
[docs] def create_example_pandas(self, row, CONTINUOUS_COLUMNS, CATEGORICAL_COLUMNS, label, label_type): """ Converting tfrecord example from pandas Pandas Dataframe을 tfrecord로 변경하는 함수(WDNN용) Args: params: * row : Dataframe row * CONTINUOUS_COLUMNS * CATEGORICAL_COLUMNS * label * label_type Returns: tfrecord example Raises: """ try: example = tf.train.Example() _CONTINUOUS_COLUMNS = CONTINUOUS_COLUMNS[:] _CATEGORICAL_COLUMNS = CATEGORICAL_COLUMNS[:] try: if label in _CATEGORICAL_COLUMNS: _CATEGORICAL_COLUMNS.remove(label) if label in _CONTINUOUS_COLUMNS: _CONTINUOUS_COLUMNS.remove(label) except Exception as e: raise Exception(e) #TODO: extende cell feature를 여기서 체크할 필요가 있을듯 함 # tfrecord는 여기서 Label을 변경한다. 나중에 꺼낼때 답이 없음 Tensor 객체로 추출되기 때문에 그러나 H5는 feeder에서 변환해주자 le = LabelEncoder() le.fit(self.combined_label_list) for col, value in row.items(): if col in _CATEGORICAL_COLUMNS: if isnan(value): value = "" example.features.feature[col].bytes_list.value.extend([str.encode(value)]) elif col in _CONTINUOUS_COLUMNS: if isnan(value): value = 0.0 example.features.feature[col].float_list.value.extend([float(value)]) if col == label: #Todo Category? Continuous? if label_type == "CONTINUOUS": example.features.feature['label'].int64_list.value.extend([int(value)]) else: trans = le.transform([value])[0] # 무조껀 0번째임 example.features.feature['label'].int64_list.value.extend([int(trans)]) return example except Exception as e: logging.error("make tfrecord column {0} value {0}".format(col,value)) logging.error("make tfrecord rows {0}".format(row)) raise Exception(e)
[docs] def create_hdf5(self, data_path, dataframe): """ Create hdf5 :param data_path: :return:dataframe """ file_name = strftime("%Y-%m-%d-%H:%M:%S", gmtime()) + ".h5" output_path = os.path.join(data_path, file_name) hdf = pd.HDFStore(output_path) hdf.put('table1', dataframe, format='table', data_columns=True, encoding='UTF-8') hdf.close()
[docs] def load_data(self, node_id = "", parm = 'all'): """ load train data :param node_id: :param parm: :return: """ try: _multi_node_flag = self.multi_node_flag if _multi_node_flag == True: file_path = utils.get_filepaths(self.data_store_path, 'tfrecords') else: file_path = utils.get_filepaths(self.data_store_path, 'h5') return file_path except Exception as e: raise Exception(e)
[docs] def load_csv_by_pandas(self, data_path): """ read csv :param data_path: :return:data_path """ try : df_csv_read = pd.read_csv(tf.gfile.Open(data_path), skipinitialspace=True, engine="python", encoding='utf-8-sig') return df_csv_read except Exception as e : raise Exception (e)
[docs] def make_column_types (self, df, node_id, data_dfconf_list): """ csv를 읽고 column type을 계산하여 data_conf에 저장(data_conf가 비어있을때 ) :param df: :param conf_data: """ try: data_conf, data_conf_unique_json =self.set_dataconf_for_checktype(df, node_id, data_dfconf_list ) data_conf_unique_cnt = self.make_unique_value_each_column(df,node_id) data_conf.update(data_conf_unique_cnt) dataconf_nodes = self._get_forward_node_with_type(node_id, 'dataconf') wf_data_conf_node = wf_data_conf(data_dfconf_list) if self.dataconf_first_time_check(wf_data_conf_node, node_id): self.set_default_dataconf_from_csv(wf_data_conf_node, node_id, data_conf) self.set_default_dataconf_from_csv(wf_data_conf_node, node_id, data_conf_unique_cnt) self.set_default_dataconf_from_csv(wf_data_conf_node, node_id, data_conf_unique_json) if self.dataconf_eval_time_check(wf_data_conf_node, node_id): self.set_default_dataconf_from_csv(wf_data_conf_node, node_id, data_conf_unique_json) return data_conf except Exception as e: logging.info("make column type Error {0} line no({1})".format(e, e.__traceback__.tb_lineno)) raise Exception(e)
[docs] def dataconf_first_time_check(self, _wf_data_conf_node, _node_name): """ data_conf가 비어있거나, DataNode일때만 업데이트 하도록 한다. :param data_dfconf_list (nn00001_1_dataconf_node) :return True: """ _value = False if (len(_wf_data_conf_node.cell_feature) == 0 or 'conf' not in _wf_data_conf_node.__dict__) and ('data_node' in _node_name): _value = True return _value
[docs] def dataconf_eval_time_check(self, _wf_data_conf_node, _node_name): """ data conf가 있어도, eval이면 unique값만 추가한다. :param data_dfconf_list (nn00001_1_dataconf_node) :return True: """ _value = False if ('evaldata' in _node_name): _value = True return _value
[docs] def make_unique_value_each_column (self, df, node_id): """ Dataframe중 범주형 데이터를 찾아서 유일한 값의 갯수를 반환한다 Unique Value return in Dataframe Args: params: * df : dataframe * node_id: nnid Returns: json Raises: """ try: data_conf = dict() column_cate_unique = dict() numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] for i, v in df.dtypes.iteritems(): if (str(v) not in numerics): # maybe need float column_cate_unique[i] = df[i].unique().size data_conf['unique_cell_feature'] = column_cate_unique data_conf_json_str = json.dumps(data_conf) data_conf_json = json.loads(data_conf_json_str) return data_conf_json except Exception as e: logging.error("make_unique_value_each_column error : {0}, {1}".format(i,v)) raise e
[docs] def set_default_dataconf_from_csv(self,wf_data_config, node_id, data_conf): """ :param wf_data_config, df, nnid, ver, node: :param conf_data: tfrecord 때문에 항상 타입을 체크하고 필요할때만 저장 """ # #TODO : set_default_dataconf_from_csv 파라미터 정리 필요 nnid = node_id.split('_')[0] ver = node_id.split('_')[1] data_node = "dataconf_node" wf_data_config.put_step_source(nnid, ver, data_node, data_conf)
[docs] def set_dataconf_for_checktype(self, df, node_id, data_dfconf_list): """ csv를 읽고 column type을 계산하여 data_conf에 저장(data_conf가 비어있을때 ) 카테고리 컬럼은 Unique 한 값을 구해서 cell_feature_unique에 넣어줌(Keras용) :param wf_data_config, df, nnid, ver, node: :param conf_data: """ try: #TODO : set_default_dataconf_from_csv 파라미터 정리 필요 data_conf = dict() data_conf_unique_v = dict() data_conf_col_unique_v = dict() data_conf_col_type = dict() numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] # Wdnn인경우 data_dfconf가 무조껀 한개만 존재 하므로 아래와 같은 로직이 가능 if len(data_dfconf_list) > 0: _wf_data_conf = wf_data_conf(data_dfconf_list) _cell_feature_unique = _wf_data_conf.cell_feature_unique if hasattr(_wf_data_conf, 'cell_feature_unique') else list() # 처음 입려할때 라벨벨류가 없으면 빈 리스트 넘김 for i, v in df.dtypes.iteritems(): # label column_dtypes = dict() column_unique_value = dict() if (str(v) in numerics): # maybe need float col_type = 'CONTINUOUS' columns_unique_value = list() else: col_type = 'CATEGORICAL' columns_unique_value = pd.unique(df[i].fillna('').values.ravel()).tolist() # null처리 해야함 column_dtypes['column_type'] = col_type origin_feature_unique = _cell_feature_unique[i].get('column_u_values') if (i in _cell_feature_unique) else list() combined_col_u_list = utils.get_combine_label_list(origin_feature_unique, columns_unique_value) column_unique_value['column_u_values'] = combined_col_u_list #읽어와서 추가되면 뒤에 붙여준다. data_conf_col_type[i] = column_dtypes data_conf_col_unique_v[i] = column_unique_value data_conf['cell_feature'] = data_conf_col_type data_conf_unique_v['cell_feature_unique'] = data_conf_col_unique_v data_conf_json_str = json.dumps(data_conf) #Json으로 바꿔줌 data_conf_json = json.loads(data_conf_json_str) data_conf_unique_json_str = json.dumps(data_conf_unique_v) data_conf_unique_json = json.loads(data_conf_unique_json_str) return data_conf_json, data_conf_unique_json except Exception as e: logging.error("set_dataconf_for_checktype {0} {1}".format(e, e.__traceback__.tb_lineno))
[docs] def set_dataconf_for_labels(self, df, label): """ csv를 읽고 label의 distict 값을 가져옴 Extract distinct label values :param wf_data_config, df, nnid, ver, node: :param conf_data: """ #TODO : set_default_dataconf_from_csv 파라미터 정리 필요 label_values = pd.unique(df[label].values.ravel().astype('str')).tolist() return label_values
def _set_progress_state(self): return None def _init_node_parm(self, key): """ Init parameter from workflow_data_frame :return: """ try : wf_data_frame = WorkFlowDataFrame(key) self.type = wf_data_frame.object_type self.data_sql_stmt = wf_data_frame.sql_stmt self.data_src_path = wf_data_frame.source_path self.data_src_type = wf_data_frame.src_type self.data_server_type = wf_data_frame.src_server self.data_preprocess_type = wf_data_frame.step_preprocess self.data_store_path = wf_data_frame.step_store self.sent_max_len = wf_data_frame.max_sentence_len self.multi_node_flag = wf_data_frame.multi_node_flag self.drop_duplicate = wf_data_frame.drop_duplicate self.combine_label_list = list() except Exception as e : raise Exception ("WorkFlowDataFrame parms are not set " + str(e))