Source code for cluster.preprocess.pre_node_feed
from cluster.preprocess.pre_node import PreProcessNode
import os
[docs]class PreNodeFeed(PreProcessNode):
"""
Error check rule add : Dataconf Add
"""
[docs] def run(self, conf_data):
self.pointer = 0
data_node_cls = None
netconf_node_cls = None
prev_node_list = self.get_prev_node()
for prev_node in prev_node_list:
if 'data' == prev_node.get_node_grp():
data_node_cls = prev_node
if 'pre_merge' == prev_node.get_node_type():
data_node_cls = prev_node
if 'dataconf' == prev_node.get_node_grp():
data_node_cls = prev_node
if data_node_cls == None:
raise Exception("data node must be needed to use feed node")
next_node_list = self.get_next_node()
for next_node in next_node_list:
if 'netconf' == next_node.get_node_grp():
netconf_node_cls = next_node
if 'eval' == next_node.get_node_grp():
netconf_node_cls = next_node
if netconf_node_cls == None :
raise Exception("netconf node must be needed to use feed node")
self.input_paths = data_node_cls.load_data(data_node_cls.get_node_name(), parm='all')
def _init_node_parm(self, node_id):
pass
def _set_progress_state(self):
pass
[docs] def has_next(self):
"""
check if hdf5 file pointer has next
:return:
"""
if(len(self.input_paths) > self.pointer) :
return True
else :
return False
[docs] def reset_pointer(self):
"""
check if hdf5 file pointer has next
:return:
"""
self.pointer = 0
[docs] def next(self):
"""
move pointer +1
:return:
"""
if(self.has_next()) :
self.pointer = self.pointer + 1
[docs] def file_size(self):
"""
:return:
"""
return len(self.input_paths)
def __getitem__(self, key):
"""
:param key:
:return:
"""
return self._convert_data_format(self.input_paths[self.pointer], key)
def _convert_data_format(self, obj, index):
pass
[docs] def data_size(self):
pass