Kafka Streams:按 Json 日志中的键分组

Kafka Streams: Grouping by a key in Json log

我有一个带有输入主题 input 的 kafka Streams 应用程序,以下记录作为 json 日志出现:

JSON 日志:

{"CreationTime":"2018-02-12T12:32:31","UserId":"abc@gmail.com","Operation":"upload","Workload":"Drive"}

我正在从以下主题构建流:

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");

接下来我想分组 "UserId" 并找到每个用户的计数。

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");

final KTable<String, Long> wordCounts = source_user_activity
        .flatMap((key, value) -> {
            List<KeyValue<String, String>> result = new LinkedList<>();
            JSONObject valueObject = new JSONObject(value);
            result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
            return result;
        })
        .groupByKey()
        .count();

wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
wordCounts.print();

接下来,我使用 console-consumeroutput 主题中获取记录。我没有看到任何文字,它只是这样的:

然而 wordCounts.print() 显示:

[KSTREAM-AGGREGATE-0000000003]: abc@gmail.com, (1<-null)

我在这里做错了什么?谢谢。

value的数据默认编码为long(你用的是LongSerde)console consumer users默认编码为StringDeserializer,因此无法正确反序列化值。

您需要通过命令行参数为控制台使用者指定 LongDeserializer 值。