流应用程序中关于数据格式转换为 json 的 kafka 管道的修改
modification in streams app regarding data format conversion to json for a kafka pipeline
``我正在设计一个管道,让生产者将随机句子发布到输入的 kafka 主题。接下来,是我的 windowed 字数统计流应用程序,它获取输入数据并对它们执行操作并获得 windowed 字数统计(Tumbling window of 5)。
问题 这里 是
在输出主题中通过消费者控制台查看的输出如下:
美国广播公司 1
美国广播公司 2
堆栈 1
溢出 2
溢出 3
...等等
我想要的输出格式:
{“单词”:“abc”,“计数”:1}
现在我需要使用 kafka connect 将其发送到 elasticsearch。其他一切正常。只有一个序列化错误,因为 elasticsearch 接受 json 格式的数据。
所以我希望在我的流对其进行操作后以 json 格式输出数据 我如何实现它。我完全卡住了。我需要在流应用程序本身中进行转换。附加下面的流应用程序以进行更改。请帮忙
KStream<String, String> initialstream = builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> Tstream = initialstream.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")));
KGroupedStream<String, String> TgroupedStream = Tstream
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> Ttable = TgroupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
Ttable
.toStream()
.selectKey((key, word) -> key.key())
.to("pipeoutput", Produced.with(Serdes.String(), Serdes.Long()));
在 .to()
之前,您需要 .map()
将您的键值对转换为与您希望发送给下游消费者的格式相匹配的预期输出
您还必须将 Produced.with 值 serde 更改为 Long,因为您要编写 JSON String
``我正在设计一个管道,让生产者将随机句子发布到输入的 kafka 主题。接下来,是我的 windowed 字数统计流应用程序,它获取输入数据并对它们执行操作并获得 windowed 字数统计(Tumbling window of 5)。
问题 这里 是
在输出主题中通过消费者控制台查看的输出如下:
美国广播公司 1
美国广播公司 2
堆栈 1
溢出 2
溢出 3
...等等
我想要的输出格式:
{“单词”:“abc”,“计数”:1}
现在我需要使用 kafka connect 将其发送到 elasticsearch。其他一切正常。只有一个序列化错误,因为 elasticsearch 接受 json 格式的数据。
所以我希望在我的流对其进行操作后以 json 格式输出数据 我如何实现它。我完全卡住了。我需要在流应用程序本身中进行转换。附加下面的流应用程序以进行更改。请帮忙
KStream<String, String> initialstream = builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> Tstream = initialstream.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")));
KGroupedStream<String, String> TgroupedStream = Tstream
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> Ttable = TgroupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
Ttable
.toStream()
.selectKey((key, word) -> key.key())
.to("pipeoutput", Produced.with(Serdes.String(), Serdes.Long()));
在 .to()
之前,您需要 .map()
将您的键值对转换为与您希望发送给下游消费者的格式相匹配的预期输出
您还必须将 Produced.with 值 serde 更改为 Long,因为您要编写 JSON String