ExecutionScript 输出两个不同的流文件 NIFI
ExecutionScript output two different flowfiles NIFI
我将 executionScript 与 python 一起使用,我有一个数据集,它可能有一些损坏的数据,我的想法是处理好的数据,并将其放入我的流文件内容中以取得成功关系和损坏的关系在失败关系中重定向它们,我做了这样的事情:
for msg in messages :
try :
id = msg['id']
timestamp = msg['time']
value_encoded = msg['data']
hexFrameType = '0x'+value_encoded[0:2]
matches = re.match(regex,value_encoded)
....
except:
error_catched.append(msg)
pass
知道我该怎么做吗?
出于此答案的目的,我假设您有一个名为 "flowFile" 的传入流文件,您是从 session.get() 获得的。如果您只是想检查 flowFile 的内容,然后根据发生的错误将其路由到成功或失败,那么在您的成功路径中,您可以使用:
session.transfer(flowFile, REL_SUCCESS)
在你的错误路径中你可以这样做:
session.transfer(flowFile, REL_FAILURE)
如果您想要新文件(可能在上面的循环中包含一个 "msg"),您可以使用:
outputFlowFile = session.create(flowFile)
使用输入流文件作为父流文件创建新的流文件。如果要写入新流文件,可以使用 my blog post.
中描述的 PyStreamCallback 技术
如果您创建新的流文件,请务必使用上述 session.transfer() 调用将其最新版本传输到 REL_SUCCESS 或 REL_FAILURE(但使用 outputFlowFile而不是流文件)。此外,您还需要删除传入的流文件(因为您已经从中创建了子流文件并转移了这些文件)。为此,您可以使用:
session.remove(flowFile)
我将 executionScript 与 python 一起使用,我有一个数据集,它可能有一些损坏的数据,我的想法是处理好的数据,并将其放入我的流文件内容中以取得成功关系和损坏的关系在失败关系中重定向它们,我做了这样的事情:
for msg in messages :
try :
id = msg['id']
timestamp = msg['time']
value_encoded = msg['data']
hexFrameType = '0x'+value_encoded[0:2]
matches = re.match(regex,value_encoded)
....
except:
error_catched.append(msg)
pass
知道我该怎么做吗?
出于此答案的目的,我假设您有一个名为 "flowFile" 的传入流文件,您是从 session.get() 获得的。如果您只是想检查 flowFile 的内容,然后根据发生的错误将其路由到成功或失败,那么在您的成功路径中,您可以使用:
session.transfer(flowFile, REL_SUCCESS)
在你的错误路径中你可以这样做:
session.transfer(flowFile, REL_FAILURE)
如果您想要新文件(可能在上面的循环中包含一个 "msg"),您可以使用:
outputFlowFile = session.create(flowFile)
使用输入流文件作为父流文件创建新的流文件。如果要写入新流文件,可以使用 my blog post.
中描述的 PyStreamCallback 技术如果您创建新的流文件,请务必使用上述 session.transfer() 调用将其最新版本传输到 REL_SUCCESS 或 REL_FAILURE(但使用 outputFlowFile而不是流文件)。此外,您还需要删除传入的流文件(因为您已经从中创建了子流文件并转移了这些文件)。为此,您可以使用:
session.remove(flowFile)