Source code for cluster.preprocess.pre_node_feed_keras2frame
from cluster.preprocess.pre_node_feed import PreNodeFeed
import tensorflow as tf
import pandas as pd
from master.workflow.dataconf.workflow_dataconf_frame import WorkflowDataConfFrame as wf_data_conf
from master.workflow.data.workflow_data_frame import WorkFlowDataFrame as wf_data_frame
from common import utils
from sklearn.preprocessing import LabelEncoder
import logging
from keras.models import Sequential
from keras.layers import Dense
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelBinarizer
[docs]class PreNodeFeedKerasFrame(PreNodeFeed):
"""
pre_feed_keras2frame
"""
[docs] def run(self, conf_data):
"""
override init class
"""
#전 노드중 dataconf를 찾아 온다 wdnn만 가능
#전체 노드중 data_conf를 넣어서 만든다. 1개 밖에 없음
try:
data_conf_node_name = self.node_name.split('_')[0] + "_" + self.node_name.split('_')[1] +"_dataconf_node"
self._init_node_parm(data_conf_node_name)
super(PreNodeFeedKerasFrame, self).run(conf_data)
#input_features = self.create_feature_columns()
#testself.node_name.split('_')[1]
self.multi_queue_and_h5_print(self.input_paths[0])
except Exception as e:
logging.error("fidder error {0}".format(e))
raise e
def _convert_data_format(self, obj, index):
pass
[docs] def create_feature_columns(self, dataconf = None):
"""
Get feature columns for tfrecord reader
TFRecord에서 feature를 추출
"""
_cell_feature = self.cell_feature
_extend_cell_feature = self.extend_cell_feature
_label = self.label
_label_calues = self.label_values
CONTINUOUS_COLUMNS, CATEGORICAL_COLUMNS = self.make_continuous_category_list(_cell_feature)
CALCULATED_CONTINUOUS_COLUMNS, CALCULATED_CATEGORICAL_COLUMNS = self.add_none_keys_cate_conti_list(CONTINUOUS_COLUMNS,CATEGORICAL_COLUMNS )
# cate를 가지고 formon을 돌리는데 카테로 만들고 extend에 key 있으면 key도 추가 하고
#conti도 for문 돌리고
#마지막에 set을 어떻게 할지도 중요하고
category_tensor = dict()
continuous_tensor = dict()
for cate_item in CALCULATED_CATEGORICAL_COLUMNS:
category_tensor[cate_item] = tf.contrib.layers.sparse_column_with_hash_bucket(cate_item, hash_bucket_size=1000)
for conti_item in CALCULATED_CONTINUOUS_COLUMNS:
continuous_tensor[conti_item] = tf.contrib.layers.real_valued_column(conti_item, dtype=tf.float32)
#NONE은 다 처리 되었고
#KEY를 처리 할려면
for key_item, value in _extend_cell_feature.items():
if value.get("column_type") == "CATEGORICAL_KEY":
category_tensor[key_item] = tf.contrib.layers.sparse_column_with_keys(column_name=key_item, keys=value.get("keys"))
category_tensor.update(continuous_tensor)
modi_set = {_v for _v in category_tensor.values()}
label_modi = tf.contrib.layers.real_valued_column("label", dtype=tf.int64)
modi_set.add(label_modi)
return modi_set #ori_set
[docs] def input_fn(self, mode, data_file, batch_size, dataconf = None):
try:
input_features = self.create_feature_columns()
features = tf.contrib.layers.create_feature_spec_for_parsing(input_features)
feature_map = tf.contrib.learn.io.read_batch_record_features(
file_pattern=[data_file],
#file_pattern=data_file,
batch_size=batch_size,
features=features,
name="read_batch_features_{}".format(mode))
target = feature_map.pop("label")
#num_epoch =
print(str(batch_size))
except Exception as e:
raise e
return feature_map, target
[docs] def input_fn2(self, mode, data_file, df, dataconf):
"""Wide & Deep Network input tensor maker
V1.0 16.11.04 Initial
:param df : dataframe from hbase
:param df, nnid
:return: tensor sparse, constraint """
try:
#self.df_validation(df, dataconf)
# remove NaN elements
_label = self.label
_label_calues = self.label_values
#df = df.dropna(how='any', axis=0)
df_test = df.dropna(how='any', axis=0)
##Make List for Continuous, Categorical Columns
CONTINUOUS_COLUMNS = []
CATEGORICAL_COLUMNS = []
##Get datadesc Continuous and Categorical infomation from Postgres nninfo
# json_string = self.get_json_by_nnid(nnid) # DATACONF
# json_object = json_string
le = LabelEncoder()
le.fit(_label_calues)
#Todo 트레이닝 하기 위해서 바꿔야함
j_feature = dataconf['cell_feature']
for cn, c_value in j_feature.items():
if c_value["column_type"] == "CATEGORICAL":
CATEGORICAL_COLUMNS.append(cn)
elif c_value["column_type"] == "CONTINUOUS":
CONTINUOUS_COLUMNS.append(cn)
elif c_value["column_type"] == "CATEGORICAL_KEY":
CATEGORICAL_COLUMNS.append(cn)
# {"data_conf": {"label": {"income_bracket": "LABEL"}, "cross_cell": {"col1": ["occupation", "education"], "col2": ["native_country", "occupation"]}, "cell_feature": {"age": {"column_type": "CONTINUOUS"}, "race": {"column_type": "CATEGORICAL"}, "gender": {"keys": ["female", "male"], "column_type": "CATEGORICAL_KEY"}, "education": {"column_type": "CATEGORICAL"}, "workclass": {"column_type": "CATEGORICAL"}, "occupation": {"column_type": "CATEGORICAL"}, "capital_gain": {"column_type": "CONTINUOUS"}, "capital_loss": {"column_type": "CONTINUOUS"}, "relationship": {"column_type": "CATEGORICAL"}, "education_num": {"column_type": "CONTINUOUS"}, "hours_per_week": {"column_type": "CONTINUOUS"}, "marital_status": {"column_type": "CATEGORICAL"}, "native_country": {"column_type": "CATEGORICAL"}}, "Transformations": {"col1": {"boundaries": [18, 25, 30, 35, 40, 45, 50, 55, 60, 65], "column_name": "age"}}}}
# Check Continuous Column is exsist?
if len(CONTINUOUS_COLUMNS) > 0:
# print(CONTINUOUS_COLUMNS)
#null 값 처리를 위해서 fillna사용
continuous_cols = {k: tf.constant(df[k].fillna(0).values) for k in CONTINUOUS_COLUMNS}
#continuous_cols = {k: tf.float32(df[k].fillna(0.).values) for k in CONTINUOUS_COLUMNS}
# Check Categorical Column is exsist?
if len(CATEGORICAL_COLUMNS) > 0:
for k in CATEGORICAL_COLUMNS:
df[k] = df[k].astype('str')
categorical_cols = {k: tf.SparseTensor(
indices=[[i, 0] for i in range(df[k].size)],
values=df[k].replace(['nan','Nan'],'').values,
dense_shape=[df[k].size, 1])
for k in CATEGORICAL_COLUMNS}
# Merges the two dictionaries into one.
feature_cols = {}
if (len(CONTINUOUS_COLUMNS) > 0):
feature_cols.update(continuous_cols)
if len(CATEGORICAL_COLUMNS) > 0:
feature_cols.update(categorical_cols)
feature_cols.pop(_label)
# dataconf
#LABEL_COLUMN = 'label'
#df[LABEL_COLUMN] = (df['income_bracket'].apply(lambda x: '>50K' in x)).astype(int)
if self.label_type == "CONTINUOUS":
df["label"] = df[_label].astype(int)
else:
#trans = le.transform([value])[0] # 무조껀 0번째임
#example.features.feature['label'].int64_list.value.extend([int(trans)])
lable_encoder_func = lambda x: le.transform([x])
df["label"] = df[_label].map(lable_encoder_func).astype(int)
#label_encode = le.transform(label_list)
label = tf.constant(df["label"].values)
return feature_cols, label
except Exception as e:
print("Error Message : {0}".format(e))
raise Exception(e)
[docs] def input_fn3(self, data_file, df, dataconf):
"""Wide & Deep Network input tensor maker
V1.0 16.11.04 Initial
:param df : dataframe from hbase
:param df, nnid
:return: tensor sparse, constraint """
try:
# remove NaN elements
_label = self.label
_label_calues = self.label_values
CONTINUOUS_COLUMNS = []
CATEGORICAL_COLUMNS = []
le = LabelEncoder()
le.fit(_label_calues)
df = df.dropna(subset=[_label],how='all')
if self.label_type == "CONTINUOUS":
df["label"] = df[_label].astype(int)
else:
lable_encoder_func = lambda x: le.transform([x])
df["label"] = df[_label].map(lable_encoder_func).astype(int)
#CATEGORICAL_COLUMNS = dict()
j_feature = dataconf['cell_feature']
for cn, c_value in j_feature.items():
if c_value["column_type"] == "CATEGORICAL":
CATEGORICAL_COLUMNS.append(cn)
#cate_u_val = c_value['column_u_values']
#cate_val = LabelEncoder()
#cate_val.fit(cate_u_val)
#df[cn] = df[cn].map(lambda x: cate_val.transform([x]))
#df[cn] = cate_val.transform(df[cn])
#_df_category_values = df[cn].values
#df[cn] = np.eye(len(cate_u_val))[_df_category_values]
#onehot_val = OneHotEncoder()
#onehot_val.fit(cate_u_val)
#df[cn] = onehot_val.transform(df[cn])
#onehotdf = pd.DataFrame
#df[cn] = pd.get_dummies(df[cn])
cate_u_val = c_value['column_u_values']
cate_val_bi = LabelBinarizer()
cate_val_bi.fit(cate_u_val)
#df[cn] = df[cn].map(lambda x: cate_val.transform([x]))
df[cn] = cate_val_bi.transform(df[cn])
print(df[cn])
if c_value["column_type"] == "CONTINUOUS":
CONTINUOUS_COLUMNS.append(cn)
df[cn] = df[cn].fillna(0)
# This makes One-Hot Encoding:
#df_dummie = pd.get_dummies(df, columns=[x for x in CATEGORICAL_COLUMNS])
#test = self.dummyEncode(CATEGORICAL_COLUMNS, df)
# This makes scaled:
#df_final = pd.DataFrame(MinMaxScaler().fit_transform(df_dummie), columns=df_dummie.columns)
#df.pop(_label)
#df.pop('SUCCESSFUL_BID_DATE')
#df.pop('YYYYMMDD')
df.reindex_axis(sorted(df.columns), axis=1)
y = df["label"].values
#df.pop("label")
X = df[df.columns.difference(["label",_label])].values
return X, y
except Exception as e:
print("Error Message : {0}".format(e))
raise Exception(e)
from sklearn.preprocessing import LabelEncoder
# Auto encodes any dataframe column of type category or object.
[docs] def dummyEncode(self, CATEGORICAL_COLUMNS, df):
#columnsToEncode = list(df.select_dtypes(include=['category', 'object']))
le = LabelEncoder()
df_onehot = pd.DataFrame()
for feature in CATEGORICAL_COLUMNS:
try:
df_onehot[feature] = le.fit_transform(df[feature])
except:
print('Error encoding ' + feature)
return df_onehot
def _convert_data_format(self, file_path, index):
"""
just pass hdf5 file chunk
:param file_path:
:param index:
:return:
"""
# try:
# h5file = h5py.File(file_path, mode='r')
# raw_data = h5file['rawdata']
# return raw_data[index.start : index.stop]
# except Exception as e:
# raise Exception(e)
# finally:
# h5file.close()
# type4 partial read
#Todo 할때마다 계속 파일을 읽는게 올바른 것인가?
try:
store = pd.HDFStore(file_path)
nrows = store.get_storer('table1').nrows
chunksize = 100
#for i in range(nrows // chunksize + 1):
chunk = store.select('table1',
start=index.start,
stop=index.stop)
except Exception as e:
raise Exception(e)
finally:
store.close()
return chunk
[docs] def data_size(self):
try:
store = pd.HDFStore(self.input_paths[self.pointer])
return store.get_storer('table1').nrows
except Exception as e:
raise Exception(e)
finally:
store.close()
[docs] def make_continuous_category_list(self,cell_feature ):
"""
Example 을 위한 Continuous 랑 Categorical을 구분하기 위한 list
"""
CONTINUOUS_COLUMNS = list()
CATEGORICAL_COLUMNS = list()
for type_columne, type_value in cell_feature.items():
if type_value["column_type"] == 'CONTINUOUS':
CONTINUOUS_COLUMNS.append(type_columne)
else:
CATEGORICAL_COLUMNS.append(type_columne)
return CONTINUOUS_COLUMNS, CATEGORICAL_COLUMNS
[docs] def add_none_keys_cate_conti_list(self,conti_list, cate_list ):
"""
Example 을 위한 Continuous 랑 Categorical을 구분하기 위한 list
"""
CALCULATED_CONTINUOUS_COLUMNS = list()
CALCULATED_CATEGORICAL_COLUMNS = list()
# , "extend_cell_feature":
# {
# "sex": {"keys": ["female", "male"], "column_type": "CATEGORICAL_KEY"}
# , "race": {"column_type": "NONE"}
# , "marital_status": {"column_type": "NONE"}
# }
_extend_cell_feature = self.extend_cell_feature
_extend_cell_feature_list = list()
_label = self.label
for _k, _v in _extend_cell_feature.items():
if _v.get("column_type") == "CATEGORICAL_KEY":
_extend_cell_feature_list.append(_k)
#_extend_cell_feature_list = list(_extend_cell_feature.keys())
_extend_cell_feature_list.append(_label)
label_extend_list = list(_extend_cell_feature_list)
for _,_key in enumerate(label_extend_list):
if _key in conti_list:
conti_list.remove(_key)
if _key in cate_list:
cate_list.remove(_key)
# for type_columne, type_value in cell_feature.items():
# if type_value["column_type"] == 'CONTINUOUS':
# CONTINUOUS_COLUMNS.append(type_columne)
# else:
# CATEGORICAL_COLUMNS.append(type_columne)
return conti_list, cate_list
[docs] def multi_queue_and_h5_print(self, file_name):
#filename = 'adult_data.tfrecords'
# Queue 는 이런식으로 설정 여기서는 쓰지 않음
#filename_queue = tf.train.string_input_producer(
# [filename], num_epochs=1)
# 꼭 local variable initial 해야함
try:
if self.multi_node_flag == True:
init_op = tf.local_variables_initializer()
# Multi Thread로 들고옴
feature_map, target = self.input_fn(tf.contrib.learn.ModeKeys.EVAL, file_name, 128)
with tf.Session() as sess:
# Start populating the filename queue.
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
tfrecord_list_row = list() # 출력을 위한 List
print_column = True
for i in range(3):
# Multi Thread에서 넣을것을 Session으로 실행
example, label = sess.run([feature_map, target])
_row = ""
if print_column == True: # Header를 위한 Column Key 설정 첫줄만
tfrecord_list_key = [col for col in example.keys()]
tfrecord_list_key.append('label')
print_column = False
tfrecord_list_row.append(tfrecord_list_key)
for i in range(len(example[list(example.keys())[0]])): # Row를 들고 오기 뭔가 지저분함
tfrecord_list_col = list()
for _k in example.keys():
if str(type(example[_k])).find('Sparse') > -1: # Sparse는 Bytes로 나와서 Bytes를 String 으로 처리
tfrecord_list_col.append(str(example[_k].values[i].decode()))
else:
# numpy도 ndarray로 나와서 [0]을 붙여 정리함
tfrecord_list_col.append(str(example[_k][i][0]))
tfrecord_list_col.append(str(label[i][0]))
columns_value = tfrecord_list_col
tfrecord_list_row.append(columns_value)
# 이쁘게 출력하기 위해 Print 함수 설정
for item in tfrecord_list_row:
print(str(item[0:])[1:-1])
coord.request_stop()
coord.join(threads)
else:
# Todo 할때마다 계속 파일을 읽는게 올바른 것인가?
try:
store = pd.HDFStore(file_name)
nrows = store.get_storer('table1').nrows
chunksize = 100
# for i in range(nrows // chunksize + 1):
chunk = store.select('table1')
print(chunk)
except Exception as e:
raise Exception(e)
finally:
store.close()
except Exception as e:
logging.error("feed just show data {0}".format(e))
def _init_node_parm(self, key):
"""
Init parameter from workflow_data_frame
:return:
"""
try :
_wf_data_conf = wf_data_conf(key)
self.label = _wf_data_conf.label
self.cell_feature= _wf_data_conf.cell_feature
self.cross_cell = _wf_data_conf.cross_cell
self.extend_cell_feature = _wf_data_conf.extend_cell_feature
self.label_values = _wf_data_conf.label_values
self.label_type = _wf_data_conf.label_type
if 'test' in self.node_name:
_wf_data_conf = wf_data_frame(key.split('_')[0] + '_' + key.split('_')[1] + '_' + 'evaldata')
self.multi_node_flag = _wf_data_conf.multi_node_flag
else :
_wf_data_conf = wf_data_frame(key.split('_')[0] + '_' + key.split('_')[1] + '_' + 'data_node')
self.multi_node_flag = _wf_data_conf.multi_node_flag
except Exception as e :
raise Exception ("WorkFlowDataFrame parms are not set " + str(e))