NiFi, flow with KafkaConsumer 写成 json
NiFi, flow with KafkaConsumer to write as json
目前我遇到了以下问题:
我正在使用 KafkaConsumer 阅读来自 Kafka 主题的消息。消息是字符串,格式如下:
{ "a" : "b", "a1" : "b1", "c2" : "c3" }
它们保存在 FlowFile 的有效负载中。
我想将该字符串转换为 json 或理想情况下转换为 csv,但不知道该怎么做。
我是 NiFi 的新手并尽可能多地进行了研究,但我找到的答案是关于从 json 到 avro 或类似的转换,但从不转换为 json 或 avro。
我还发现 Kafka 消息在 FlowFile 的有效负载中,而不是在属性中,所以我不知道如何获取它,因为示例总是涉及属性。
简而言之:我可以使用一些内置处理器将 FlowFile 的有效负载(一个字符串)转换为 json/cvs。
如果您的消息在 FlowFile 中,以下顺序可能会有所帮助:
1) 使用 AttributesToJson 将有效载荷消息转换为 Json。
2) 使用 EvaluateJsonPath 提取有效负载消息。在你的情况下是 kafka 消息。然后您可以传递提取的消息以生成 csv。
这个 post 可以帮助将 Json 转换为 CSV:Convert Json To CSV
我最终这样做了:
- ConsumeKafka 给我字符串:
{ "a" : "b", "a1" : "b1" }
- EvaluateJsonPath 通过添加属性创建属性
a -> $.a //results in attribute named a with value b
a1 -> $.a1 //results in attribute named a1 with value b1
- ReplaceText 从 EvaluateJsonPath 获取属性以形成一个单一的 csv 格式:
Replacement value -> ${'a'},${'a1'}
结果是单行,但没有换行:
b,b1
添加新行附加 \n, '\n', "\n" 没有 工作。
有效方法 在 替换值 字段中输入时按 Shift+Enter,这导致创建了一个空新行。
目前我遇到了以下问题:
我正在使用 KafkaConsumer 阅读来自 Kafka 主题的消息。消息是字符串,格式如下:
{ "a" : "b", "a1" : "b1", "c2" : "c3" }
它们保存在 FlowFile 的有效负载中。
我想将该字符串转换为 json 或理想情况下转换为 csv,但不知道该怎么做。
我是 NiFi 的新手并尽可能多地进行了研究,但我找到的答案是关于从 json 到 avro 或类似的转换,但从不转换为 json 或 avro。 我还发现 Kafka 消息在 FlowFile 的有效负载中,而不是在属性中,所以我不知道如何获取它,因为示例总是涉及属性。
简而言之:我可以使用一些内置处理器将 FlowFile 的有效负载(一个字符串)转换为 json/cvs。
如果您的消息在 FlowFile 中,以下顺序可能会有所帮助:
1) 使用 AttributesToJson 将有效载荷消息转换为 Json。 2) 使用 EvaluateJsonPath 提取有效负载消息。在你的情况下是 kafka 消息。然后您可以传递提取的消息以生成 csv。
这个 post 可以帮助将 Json 转换为 CSV:Convert Json To CSV
我最终这样做了:
- ConsumeKafka 给我字符串:
{ "a" : "b", "a1" : "b1" }
- EvaluateJsonPath 通过添加属性创建属性
a -> $.a //results in attribute named a with value b
a1 -> $.a1 //results in attribute named a1 with value b1
- ReplaceText 从 EvaluateJsonPath 获取属性以形成一个单一的 csv 格式:
Replacement value -> ${'a'},${'a1'}
结果是单行,但没有换行:
b,b1
添加新行附加 \n, '\n', "\n" 没有 工作。 有效方法 在 替换值 字段中输入时按 Shift+Enter,这导致创建了一个空新行。