Nifi-1.0.0 - 同步机制
Nifi-1.0.0 - synchronization mechanism
NiFi 是否有同步机制以便知道什么时候完成处理?
我提取了一些数据,进行了一些处理,在第 N-1 步,我想知道所有数据都已处理,以便继续(最终)第 N 步。
[GetFile / 1000 000 行] ----> [Proc1 / 处理步骤 0] -----> [Proc2 / 处理步骤 1] .... [PutSQL / 插入数据库] -- -> [让我知道我已经在 table 中插入所有数据的过程] ----> [例如,ProcN / 运行 聚合数据]
NiFi 并没有真正内置到框架中的显式同步功能,但一些处理器具有有助于同步的功能 activity。我可以想到几种可能的方法来使您的流程正常工作:
调度 - 您可以在处理器上使用 CRON 调度来调度 GetFile 和稍后的聚合操作,假设操作的持续时间相对可预测。
MonitorActivity - MonitorActivity 处理器可以根据队列中的 inactivity 触发流文件。您可以在 PutSQL 的下游使用它,并在插入停止并且聚合应该开始时触发。
MergeContent(简单) - MergeContent 处理器可能会将 PutSQL 的结果聚合到触发聚合操作的单个消息中。您必须尝试 bin 大小和年龄的属性才能使其正常工作。
MergeContent(碎片整理) - MergeContent 有一个碎片整理策略,旨在将较大文件的碎片关联在一起。它需要在流文件上设置特定属性,请参阅文档底部的 "Reads Attributes" 部分。该行为似乎接近您想要的,但设置这些片段属性可能很困难。
我可能有一个建议供您尝试。 NiFi 有一个很好的 API 允许你启动和停止处理器。您可以使用 InvokeHTTP 处理器从 NiFi 中调用此 API。这允许您启动 [例如 ProcN / 运行 聚合数据 ] 并在它 运行 之后再次关闭。您必须确保此处理器不会连续 运行。所以你的处理器将是:
GetFile / 1000 000 lines] ----> [ Proc1 / process step 0 ] -----> [ Proc2 / process step 1 ] .... [ PutSQL / insert into db ] ---> [ Proc to let me know that I've inserted all the data in the table ] ----> -----> [ InvokeHTTP to start ProcN / Run Aggregates ] --via API call--> [ ProcN / Run aggregates on data for example ] -----> [ InvokeHTTP to stop ProcN / Run Aggregates ]
我们正在研究这种同步请求的方法 - 向远程方回复消息并防止管道中有太多消息。
NiFi 是否有同步机制以便知道什么时候完成处理?
我提取了一些数据,进行了一些处理,在第 N-1 步,我想知道所有数据都已处理,以便继续(最终)第 N 步。
[GetFile / 1000 000 行] ----> [Proc1 / 处理步骤 0] -----> [Proc2 / 处理步骤 1] .... [PutSQL / 插入数据库] -- -> [让我知道我已经在 table 中插入所有数据的过程] ----> [例如,ProcN / 运行 聚合数据]
NiFi 并没有真正内置到框架中的显式同步功能,但一些处理器具有有助于同步的功能 activity。我可以想到几种可能的方法来使您的流程正常工作:
调度 - 您可以在处理器上使用 CRON 调度来调度 GetFile 和稍后的聚合操作,假设操作的持续时间相对可预测。
MonitorActivity - MonitorActivity 处理器可以根据队列中的 inactivity 触发流文件。您可以在 PutSQL 的下游使用它,并在插入停止并且聚合应该开始时触发。
MergeContent(简单) - MergeContent 处理器可能会将 PutSQL 的结果聚合到触发聚合操作的单个消息中。您必须尝试 bin 大小和年龄的属性才能使其正常工作。
MergeContent(碎片整理) - MergeContent 有一个碎片整理策略,旨在将较大文件的碎片关联在一起。它需要在流文件上设置特定属性,请参阅文档底部的 "Reads Attributes" 部分。该行为似乎接近您想要的,但设置这些片段属性可能很困难。
我可能有一个建议供您尝试。 NiFi 有一个很好的 API 允许你启动和停止处理器。您可以使用 InvokeHTTP 处理器从 NiFi 中调用此 API。这允许您启动 [例如 ProcN / 运行 聚合数据 ] 并在它 运行 之后再次关闭。您必须确保此处理器不会连续 运行。所以你的处理器将是:
GetFile / 1000 000 lines] ----> [ Proc1 / process step 0 ] -----> [ Proc2 / process step 1 ] .... [ PutSQL / insert into db ] ---> [ Proc to let me know that I've inserted all the data in the table ] ----> -----> [ InvokeHTTP to start ProcN / Run Aggregates ] --via API call--> [ ProcN / Run aggregates on data for example ] -----> [ InvokeHTTP to stop ProcN / Run Aggregates ]
我们正在研究这种同步请求的方法 - 向远程方回复消息并防止管道中有太多消息。