Kafka Streams:按 Json 日志中的键分组
Kafka Streams: Grouping by a key in Json log
我有一个带有输入主题 input
的 kafka Streams 应用程序,以下记录作为 json 日志出现:
JSON 日志:
{"CreationTime":"2018-02-12T12:32:31","UserId":"abc@gmail.com","Operation":"upload","Workload":"Drive"}
我正在从以下主题构建流:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
接下来我想分组 "UserId"
并找到每个用户的计数。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
final KTable<String, Long> wordCounts = source_user_activity
.flatMap((key, value) -> {
List<KeyValue<String, String>> result = new LinkedList<>();
JSONObject valueObject = new JSONObject(value);
result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
return result;
})
.groupByKey()
.count();
wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
wordCounts.print();
接下来,我使用 console-consumer
从 output
主题中获取记录。我没有看到任何文字,它只是这样的:
然而 wordCounts.print()
显示:
[KSTREAM-AGGREGATE-0000000003]: abc@gmail.com, (1<-null)
我在这里做错了什么?谢谢。
value的数据默认编码为long
(你用的是LongSerde
)console consumer users默认编码为StringDeserializer
,因此无法正确反序列化值。
您需要通过命令行参数为控制台使用者指定 LongDeserializer
值。
我有一个带有输入主题 input
的 kafka Streams 应用程序,以下记录作为 json 日志出现:
JSON 日志:
{"CreationTime":"2018-02-12T12:32:31","UserId":"abc@gmail.com","Operation":"upload","Workload":"Drive"}
我正在从以下主题构建流:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
接下来我想分组 "UserId"
并找到每个用户的计数。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
final KTable<String, Long> wordCounts = source_user_activity
.flatMap((key, value) -> {
List<KeyValue<String, String>> result = new LinkedList<>();
JSONObject valueObject = new JSONObject(value);
result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
return result;
})
.groupByKey()
.count();
wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
wordCounts.print();
接下来,我使用 console-consumer
从 output
主题中获取记录。我没有看到任何文字,它只是这样的:
然而 wordCounts.print()
显示:
[KSTREAM-AGGREGATE-0000000003]: abc@gmail.com, (1<-null)
我在这里做错了什么?谢谢。
value的数据默认编码为long
(你用的是LongSerde
)console consumer users默认编码为StringDeserializer
,因此无法正确反序列化值。
您需要通过命令行参数为控制台使用者指定 LongDeserializer
值。