NiFi, flow with KafkaConsumer 写成 json

NiFi, flow with KafkaConsumer to write as json

目前我遇到了以下问题: 我正在使用 KafkaConsumer 阅读来自 Kafka 主题的消息。消息是字符串,格式如下: { "a" : "b", "a1" : "b1", "c2" : "c3" } 它们保存在 FlowFile 的有效负载中。

我想将该字符串转换为 json 或理想情况下转换为 csv,但不知道该怎么做。

我是 NiFi 的新手并尽可能多地进行了研究,但我找到的答案是关于从 json 到 avro 或类似的转换,但从不转换为 json 或 avro。 我还发现 Kafka 消息在 FlowFile 的有效负载中,而不是在属性中,所以我不知道如何获取它,因为示例总是涉及属性。

简而言之:我可以使用一些内置处理器将 FlowFile 的有效负载(一个字符串)转换为 json/cvs。

如果您的消息在 FlowFile 中,以下顺序可能会有所帮助:

1) 使用 AttributesToJson 将有效载荷消息转换为 Json。 2) 使用 EvaluateJsonPath 提取有效负载消息。在你的情况下是 kafka 消息。然后您可以传递提取的消息以生成 csv。

这个 post 可以帮助将 Json 转换为 CSV:Convert Json To CSV

我最终这样做了:

  1. ConsumeKafka 给我字符串:

{ "a" : "b", "a1" : "b1" }

  1. EvaluateJsonPath 通过添加属性创建属性

a -> $.a //results in attribute named a with value b

a1 -> $.a1 //results in attribute named a1 with value b1

  1. ReplaceText 从 EvaluateJsonPath 获取属性以形成一个单一的 csv 格式:

Replacement value -> ${'a'},${'a1'}

结果是单行,但没有换行

b,b1

添加新行附加 \n, '\n', "\n" 没有 工作。 有效方法替换值 字段中输入时按 Shift+Enter,这导致创建了一个空新行。