Source code for cluster.common.neural_common_wdnn


import tensorflow as tf
import json
import tempfile
from django.conf import settings
from sklearn.preprocessing import LabelEncoder
import numpy as np


flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string("model_type", "wide_n_deep",
                    "Valid model types: {'wide', 'deep', 'wide_n_deep'}.")
flags.DEFINE_integer("train_steps", 10000, "Number of training steps.")

"""
WDNN NETWORK COMMON CLASS
    WDNN Network needs input_fn, wdnn_build function.
"""

[docs]class NeuralCommonWdnn():
[docs] def wdnn_build(self, model_type, nodeid, hidden_layers, activation_fn, dataconf, model_dir, train=True, dememsion_auto_flag=False): """ wide & deep netowork builder :param nnid :param model_dir : directory of chkpoint of wdnn model :param train : train or predict :return: tensorflow network model """ try: hidden_layers_value = hidden_layers json_object = dataconf #json.load(dataconf) data_conf_json = dataconf #json.loads(dataconf) _extend_cell_feature = dataconf.get('extend_cell_feature') #label_object = dataconf _dememsion_auto_flag = dememsion_auto_flag featureDeepEmbedding={} featureTransfomation = {} j_feature = data_conf_json["cell_feature"] _label = data_conf_json['label'] _cell_feature_unique = dataconf.get('cell_feature_unique') # one line expression change # Categorial columns을 Sparse Layer로 만듬 python처럼 한줄로 표시 featureColumnCategorical = {cn:tf.contrib.layers.sparse_column_with_hash_bucket(cn, hash_bucket_size=1000) for cn, c_value in j_feature.items() if c_value["column_type"] == "CATEGORICAL" } # 범주형 Categorial columns을 추가 업데이트를 통해 두 딕셔너리를 합침 세련되게 #featureColumnCategorical.update({ cn:tf.contrib.layers.sparse_column_with_keys(column_name=cn,keys=c_value["keys"]) # for cn, c_value in j_feature.items() if c_value["column_type"] == "CATEGORICAL_KEY" }) if _label in featureColumnCategorical: featureColumnCategorical.pop(_label) # Extend_cell_feature를 가져와서 사용자가 입력한것을 반영 for _k, _v in _extend_cell_feature.items(): featureColumnCategorical.pop(_k, None) # 중복제거를 위하 Dict에서 사용자지정항목 삭제 for _k, _v in _extend_cell_feature.items(): if _v.get("column_type") == "CATEGORICAL_KEY": featureColumnCategorical.update({ _k:tf.contrib.layers.sparse_column_with_keys(column_name=_k,keys=_v.get("keys"))}) # Categorucal layer를 emdedding 해서 차원을 줄임 if _dememsion_auto_flag == False or _dememsion_auto_flag == None: featureDeepEmbedding = {key:tf.contrib.layers.embedding_column(value, dimension=8) for key, value in featureColumnCategorical.items()} else: #demension = n unique value , K = small conatraint , k * (n ** 1/4) auto_demension = {key: round(3*(len(_cell_feature_unique[key].get('column_u_values'))**1/4) )for key, value in featureColumnCategorical.items()} featureDeepEmbedding = {key: tf.contrib.layers.embedding_column(value, dimension=auto_demension.get(key)) for key, value in featureColumnCategorical.items()} # Json에서 Continuous Colums 가져옴 featureColumnContinuous = {cn:tf.contrib.layers.real_valued_column(cn) for cn, c_value in j_feature.items() if c_value["column_type"] == "CONTINUOUS"} if _label in featureColumnContinuous: featureColumnContinuous.pop(_label) # Extend_cell_feature를 가져와서 사용자가 입력한것을 반영 for _k, _v in _extend_cell_feature.items(): featureColumnContinuous.pop(_k, None) # 중복제거를 위하 Dict에서 사용자지정항목 삭제 #Transformations if data_conf_json.get('Transformations'): featureTransfomation = {key:tf.contrib.layers.bucketized_column(featureColumnContinuous[values['column_name']],values['boundaries']) for key, values in data_conf_json['Transformations'].items()} #make wide columns #Todo shoud be check is it right? wide colummns is featureColumnCategorical wide_columns= [sparseTensor for key, sparseTensor in featureColumnCategorical.items()] wide_columns.extend([transTensor for key, transTensor in featureTransfomation.items()]) cross_list = list() if data_conf_json.get('cross_cell'): for key, cross_col_list in data_conf_json["cross_cell"].items(): cross_list_item = list() for cross_col in cross_col_list: if featureColumnCategorical.get(cross_col): cross_list_item.extend([featureColumnCategorical[cross_col]]) elif featureColumnContinuous.get(cross_col): cross_list_item.extend([featureColumnContinuous[cross_col]]) elif featureTransfomation.get(cross_col): cross_list_item.extend([featureTransfomation[cross_col]]) cross_list.append(cross_list_item) wide_columns.extend( [tf.contrib.layers.crossed_column(cross_col, hash_bucket_size=int(1e4)) for cross_col in cross_list]) #make deep Columns deep_columns = [realTensor for key, realTensor in featureColumnContinuous.items()] deep_columns.extend([embedTensor for key, embedTensor in featureDeepEmbedding.items()]) #wide_columns = [] if model_type == "category": m = tf.contrib.learn.DNNLinearCombinedClassifier( model_dir=model_dir, linear_feature_columns=wide_columns, dnn_feature_columns=deep_columns, #n_classes=2, # 0.11 bug dnn_hidden_units=hidden_layers_value) elif model_type == "regression": m = tf.contrib.learn.DNNLinearCombinedRegressor( model_dir=model_dir, linear_feature_columns=wide_columns, dnn_feature_columns=deep_columns, #n_classes=2, # 0.11 bug dnn_hidden_units=hidden_layers_value) elif model_type == "wide": m = tf.contrib.learn.LinearClassifier(model_dir=model_dir, feature_columns=wide_columns ,enable_centered_bias = True) elif model_type =="deep": m = tf.contrib.learn.DNNClassifier(model_dir=model_dir, feature_columns=deep_columns, #n_classes = label_cnt, #0.11 bug hidden_units=hidden_layers_value) return m except Exception as e: print("Error Message : {0}".format(e)) raise Exception(e)
[docs] def df_validation(self, df, dataconf): print("age df_validation start ") for df_value in df["age"].values: if(np.issubdtype(df_value, int) == False): print("age integer faild " + str(df_value))
#if(isinstance(df_value, int) == False ): # print("age integer faild " + str(df_value))
[docs] def input_fn(self, df, nnid, dataconf ): """Wide & Deep Network input tensor maker V1.0 16.11.04 Initial :param df : dataframe from hbase :param df, nnid :return: tensor sparse, constraint """ try: self.df_validation(df, dataconf) # remove NaN elements df = df.dropna(how='any', axis=0) #df_test = df_test.dropna(how='any', axis=0) ##Make List for Continuous, Categorical Columns CONTINUOUS_COLUMNS = [] CATEGORICAL_COLUMNS = [] ##Get datadesc Continuous and Categorical infomation from Postgres nninfo #json_string = self.get_json_by_nnid(nnid) # DATACONF #json_object = json_string j_feature = dataconf['cell_feature'] for cn, c_value in j_feature.items(): if c_value["column_type"] == "CATEGORICAL": CATEGORICAL_COLUMNS.append(cn) elif c_value["column_type"] == "CONTINUOUS": CONTINUOUS_COLUMNS.append(cn) elif c_value["column_type"] =="CATEGORICAL_KEY": CATEGORICAL_COLUMNS.append(cn) #{"data_conf": {"label": {"income_bracket": "LABEL"}, "cross_cell": {"col1": ["occupation", "education"], "col2": ["native_country", "occupation"]}, "cell_feature": {"age": {"column_type": "CONTINUOUS"}, "race": {"column_type": "CATEGORICAL"}, "gender": {"keys": ["female", "male"], "column_type": "CATEGORICAL_KEY"}, "education": {"column_type": "CATEGORICAL"}, "workclass": {"column_type": "CATEGORICAL"}, "occupation": {"column_type": "CATEGORICAL"}, "capital_gain": {"column_type": "CONTINUOUS"}, "capital_loss": {"column_type": "CONTINUOUS"}, "relationship": {"column_type": "CATEGORICAL"}, "education_num": {"column_type": "CONTINUOUS"}, "hours_per_week": {"column_type": "CONTINUOUS"}, "marital_status": {"column_type": "CATEGORICAL"}, "native_country": {"column_type": "CATEGORICAL"}}, "Transformations": {"col1": {"boundaries": [18, 25, 30, 35, 40, 45, 50, 55, 60, 65], "column_name": "age"}}}} # Check Continuous Column is exsist? if len(CONTINUOUS_COLUMNS)>0 : #print(CONTINUOUS_COLUMNS) continuous_cols = {k: tf.constant(df[k].values) for k in CONTINUOUS_COLUMNS} # Check Categorical Column is exsist? if len(CATEGORICAL_COLUMNS) > 0 : for k in CATEGORICAL_COLUMNS: df[k] = df[k].astype('str') # _indices = [[i, 0] for i in range(df['race'].size)] # _values = df['race'].values # _shape = [df['race'].size, 1] # # test_tensor3 = tf.SparseTensor(indices=[[0, 0], [1, 2]], values=[1, 2], dense_shape=[3, 4]) # # test_Tensor2 = tf.SparseTensor(indices = _indices, values = _values,dense_shape = _shape ) # # testTensor = tf.SparseTensor( # indices=[[i, 0] for i in range(df['race'].size)], # values=df['race'].values, # dense_shape=[df['race'].size, 1]) categorical_cols = {k: tf.SparseTensor( indices=[[i, 0] for i in range(df[k].size)], values=df[k].values, dense_shape=[df[k].size, 1]) for k in CATEGORICAL_COLUMNS} # categorical_cols = {k: tf.SparseTensor( # indices=[[i, 0] for i in range(df[k].size)], # values=df[k].values, # shape=[df[k].size, 1]) # for k in CATEGORICAL_COLUMNS} # Merges the two dictionaries into one. feature_cols = {} if(len(CONTINUOUS_COLUMNS)>0): feature_cols.update(continuous_cols) if len(CATEGORICAL_COLUMNS) > 0: feature_cols.update(categorical_cols) #test용도 #feature_cols = categorical_cols #Get label distinct list from postgres 16.12.04 #json_string = WdnnCommonManager.get_all_info_json_by_nnid(self, nnid=nnid) #dataconf LABEL_COLUMN = 'label' df[LABEL_COLUMN] = (df['income_bracket'].apply(lambda x: '>50K' in x)).astype(int) #_label_list = json_string['datasets'] #label_list = eval(_label_list) #le = LabelEncoder() #le.fit(label_list) #lable_encoder_func = lambda x: le.transform([x]) #df['label'] = df['label'].map(lable_encoder_func).astype(int) #label_encode = le.transform(label_list) label = tf.constant(df["label"].values) return feature_cols, label except Exception as e: print("Error Message : {0}".format(e)) raise Exception(e)