使用 NiFi 在 CSV 中合并/添加字段

Mergin /adding fields inside of a CSV using NiFi

我关注了 task/problem:

我得到了一个静态数据文件,该文件被推送到 bigquery table。推送时,流文件会被进一步传递。在将文件再次推入另一个 bq 之前 table 但是在将文件推入第二个 bq 之前,有几个字段需要处理某种逻辑 table.

示例: 输入文件中有 2 个字段,在推入 bq 之前应合并为一个字段。 ->具体:字段GD270A和GD270B的内容应该放在一起(字符串,两者)并放入GD260

管道在第一次推送时工作,但我缺少合并逻辑,是否有任何默认处理器或我是否需要开发自定义处理器?

current Set-up 是一个 GCP(1 个 VM 运行 NiFi,2 个 Buckets 和一个 BQ) NiFi-pipe 看起来像这样: The NiFi-pipe looks like this

非常感谢任何帮助或想法!

编辑:

CSV 的

Header: GD622|GD622PW|KZLOESCH|KZLOEDAT|GD100A|GD270A|GD270B|GD260|GD240|GD245|GD621|GD170|GD171|GD172|GD198A|GD198B|GD198C|GD198D|GD198E|GD198F|GD198G|GD455A|GD630A|GD455D|GD660| GD669|GD867A|GD205C|GD161|GD432|GD432A|GD649|FTRELEV|FTGUEAB|FTGUEBIS|GD968D|GD160|GD650A|GD630B|GD226|CUSIPNR|GD200|GD211|GD212|GD213|GD214|GD220|GD220A|GD221|GD225| GD228|GD230|GD258U|GD280A|GD290A|GD300|GD311A|GD311B|GD312|GD321|GD322|GD352|GD352A|GD400|GD481|GD545|GD636|GD685G|GD685H|GD801A|GD801AJN|GD802|GD802A|GD803E|GD804E| GD805|GD806|GD806A|GD808|GD808A|GD808B|GD808C|GD809A|GD811|GD815B|GD815C|GD821B|GD861A|GD861E|GD861F|GD862|GD910|GD910A|GD924|GD924B|GD970A|GD970I

一个示例行: ||1||1|VALL|oN||EUR|STK|EUR|U202|ZZZZ|ZZZZ|+000000000000000001.000000000|||| ||K6431|AD||||||||043|||50||+00028|4|956|36||+129|043||B|0916|SH|||||13||| ||||049||7||N||||||||||||||||||||||||

粗体字是应合并到 GD260 中的字段,如下所示:

||1||1|VALL|oN|VALLoN|EUR|STK|EUR|U202|ZZZZ|ZZZZ|+000000000000000001.000000000||||||K6431| AD||||||||043|||50||+00028|4|956|36||+129|043||B|0916|SH|||||13||||||| 049||7||N||||||||||||||||||||||||

希望这对您有所帮助,“|”显然是分隔符:)

听起来你应该可以用一个 UpdateRecord - see the Additional Details 页面来做例子

使用 Record Path ValueReplacement Value Strategy - 请参阅文档了解如何使用 Record Paths

将用户定义的 属性 添加到名称为 /GD260 且值为 concat(/GD270A, /GD270B)Update Record - 使用 concat function