创建 Kafka Stream 以计算值的数量
Create Kafka Stream for count amount from value
我正在生成如下数据:
Key: "Mike", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "John", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "Mike", value: {"amount":50,"time":"2021-11-05T07:53:32.005751Z"}
键是字符串(像爱丽丝、约翰...这样的名字)。
例如我需要结果:
{"Mike": 2}
{"John": 1}
或
{"key":"Mike", "count": 2}
{"key":"John", "count": 1}
接下来我尝试了:
public Topology createTopology(){
StreamsBuilder builder = new StreamsBuilder();
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
KTable<String, Long> wordCounts = textLines
.map((k, v) -> new KeyValue<>(k, v.get("amount").asInt()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
.count();
wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-favorite-amount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:29092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Mc4CalculateFavoriteAmount wordCountApp = new Mc4CalculateFavoriteAmount();
KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
我正在尝试按名称对邮件进行计数。但是我在主题中得到了工件:
Welcome to Stack Overflow! You seem to be asking for someone to write some code for you. Stack Overflow is a question and answer site, not a code-writing service. Please see here to learn how to write effective questions.
您可以根据您的用例调整此 official Confluent Example。这个例子和你问的很相似。
为了进一步解释这一点,您需要创建一个流应用程序,您可以在其中将主题中的数据读入 KStream
。您尚未提供有关您的密钥的信息。在 Confluent 示例中,记录使用 map()
方法显式重新分区,为每个记录创建一个新的 KeyValue
实例(您可以使用 amount
作为键)。然后事件按键分组并计数。
如果您只想对键进行计数,则可以舍弃整个值并将其替换为 1
以表示每个看到的键。
KStream<String, Bytes> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), Serdes.Bytes()));
KTable<String, Long> wordCounts = textLines
.mapValues(v -> 1L)
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.count();
wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));
我正在生成如下数据:
Key: "Mike", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "John", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "Mike", value: {"amount":50,"time":"2021-11-05T07:53:32.005751Z"}
键是字符串(像爱丽丝、约翰...这样的名字)。 例如我需要结果:
{"Mike": 2}
{"John": 1}
或
{"key":"Mike", "count": 2}
{"key":"John", "count": 1}
接下来我尝试了:
public Topology createTopology(){
StreamsBuilder builder = new StreamsBuilder();
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
KTable<String, Long> wordCounts = textLines
.map((k, v) -> new KeyValue<>(k, v.get("amount").asInt()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
.count();
wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-favorite-amount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:29092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Mc4CalculateFavoriteAmount wordCountApp = new Mc4CalculateFavoriteAmount();
KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
我正在尝试按名称对邮件进行计数。但是我在主题中得到了工件:
Welcome to Stack Overflow! You seem to be asking for someone to write some code for you. Stack Overflow is a question and answer site, not a code-writing service. Please see here to learn how to write effective questions.
您可以根据您的用例调整此 official Confluent Example。这个例子和你问的很相似。
为了进一步解释这一点,您需要创建一个流应用程序,您可以在其中将主题中的数据读入 KStream
。您尚未提供有关您的密钥的信息。在 Confluent 示例中,记录使用 map()
方法显式重新分区,为每个记录创建一个新的 KeyValue
实例(您可以使用 amount
作为键)。然后事件按键分组并计数。
如果您只想对键进行计数,则可以舍弃整个值并将其替换为 1
以表示每个看到的键。
KStream<String, Bytes> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), Serdes.Bytes()));
KTable<String, Long> wordCounts = textLines
.mapValues(v -> 1L)
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.count();
wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));