如何等待 GenerateTableFetch 查询完成
How to wait for GenerateTableFetch queries to finish
我的用例是这样的。我有一些 X table 需要从 MySQL 中提取。我使用 SplitText
拆分它们,将每个 table 放入单独的流文件中,并使用 GenerateTableFetch
和 ExecuteSQL
拉取。
我希望在所有 table 的导入完成后收到通知或执行其他操作。在 SplitText
文本处理器中,我已将 original
关系路由到 ${filename}
上的 Wait
,目标计数为 ${fragment.count}
。这将跟踪完成了多少 table。
但现在我不知道如何知道特定的 table 何时完成。 GenerateTableFetch
根据分区大小将流文件分成多个。但它不会写像 fragment.count 这样的属性,我可以用它来等待每个 table.
有什么办法可以做到这一点吗?或者有没有办法在整个流程结束时知道流程中的所有流程文件是否都已处理并且没有任何内容在队列中或正在处理?
如果您有一个独立的 NiFi 实例(或者没有在集群之间将流文件分发到 ExecuteSQL 节点),那么您可以改用 QueryDatabaseTable,它(默认情况下)只会在整个过程中发出所有流文件结果集被处理。如果您将所有行都放入单个流文件中,那么流文件已传输到下游这一事实表明提取已完成。
我已经写了 NIFI-5601 来介绍为 GTF 生成的流文件添加 fragment.* 属性的改进。
直到 NiFi 添加对此的支持,我设法使用 MergeContent
使其工作。使用 table_name 作为 Correlation attribute name
,然后使用 merged
与 Wait
处理器的关系,使用 ${merge.count}
作为目标。如果有人想做同样的事情,请参考屏幕截图。
我的用例是这样的。我有一些 X table 需要从 MySQL 中提取。我使用 SplitText
拆分它们,将每个 table 放入单独的流文件中,并使用 GenerateTableFetch
和 ExecuteSQL
拉取。
我希望在所有 table 的导入完成后收到通知或执行其他操作。在 SplitText
文本处理器中,我已将 original
关系路由到 ${filename}
上的 Wait
,目标计数为 ${fragment.count}
。这将跟踪完成了多少 table。
但现在我不知道如何知道特定的 table 何时完成。 GenerateTableFetch
根据分区大小将流文件分成多个。但它不会写像 fragment.count 这样的属性,我可以用它来等待每个 table.
有什么办法可以做到这一点吗?或者有没有办法在整个流程结束时知道流程中的所有流程文件是否都已处理并且没有任何内容在队列中或正在处理?
如果您有一个独立的 NiFi 实例(或者没有在集群之间将流文件分发到 ExecuteSQL 节点),那么您可以改用 QueryDatabaseTable,它(默认情况下)只会在整个过程中发出所有流文件结果集被处理。如果您将所有行都放入单个流文件中,那么流文件已传输到下游这一事实表明提取已完成。
我已经写了 NIFI-5601 来介绍为 GTF 生成的流文件添加 fragment.* 属性的改进。
直到 NiFi 添加对此的支持,我设法使用 MergeContent
使其工作。使用 table_name 作为 Correlation attribute name
,然后使用 merged
与 Wait
处理器的关系,使用 ${merge.count}
作为目标。如果有人想做同样的事情,请参考屏幕截图。