这种聚合在 Kafka 流中是如何工作的?
How does this aggregation works in the Kafka stream?
我是 Apache Kafka 的新手。我阅读了一个 Steam 应用程序的代码,偶然发现了聚合操作。我尝试自己理解它,如果我的解释正确,我需要确认。
下面提供了从主题和聚合中读取的代码片段,
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
// read from the topic 'bank-transactions' as `KStream`. I provided the producer below
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
// we define the grouping and aggregation here
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);
bank-transactions
主题的数据流生成如下,
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time using Java 8
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
初始化余额如下,
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
newBalance
方法采用交易和余额以及 returns 新余额,
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
// create a new balance json object
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
newBalance.put("time", newBalanceInstant.toString());
return newBalance;
}
我有 2 个关于分组和聚合的问题,
一个。 groupByKey
是按Serdes.String()
分组,而jsonSerde
只是对steam 数据进行序列化和反序列化吗? Serdes.String()
是 newRandomTransaction
方法中的名称字符串。
b。我的断言是 aggregation
函数中的 key, transaction
行 (key, transaction, balance) -> newBalance(transaction, balance)
是从 bank-transactions
主题中读取的,而 balance
来自 initialBalance
从上一行。对吗?
我在尝试调试应用程序时也感到困惑,尽管它可以无缝运行。
Is the groupByKey is grouping by the Serdes.String() and the jsonSerde is only performing the serialization and deserialization for the steam data?
是的,groupByKey 是按键进行分组,这些键可以反序列化并作为字符串进行比较
My assertion is the key, transaction inside the aggregation function of the line (key, transaction, balance) -> newBalance(transaction, balance) is read from the bank-transactions topic and the balance is coming from the initialBalance from the previous line
差不多。初始化器在第一个参数上,是的,但是聚合结果在应用程序的整个执行过程中被结转,无休止地聚合。
换句话说,您总是从 initialBalance
开始,然后对于每个相同的密钥,您将 transaction
的余额添加到该密钥当前累积的 balance
.如果你还没有看到密钥重复,那么才会将其添加到初始余额中
是的,您的输入主题是由 KStreams builder.stream
方法指定的
我是 Apache Kafka 的新手。我阅读了一个 Steam 应用程序的代码,偶然发现了聚合操作。我尝试自己理解它,如果我的解释正确,我需要确认。
下面提供了从主题和聚合中读取的代码片段,
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
// read from the topic 'bank-transactions' as `KStream`. I provided the producer below
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
// we define the grouping and aggregation here
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);
bank-transactions
主题的数据流生成如下,
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time using Java 8
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
初始化余额如下,
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
newBalance
方法采用交易和余额以及 returns 新余额,
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
// create a new balance json object
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
newBalance.put("time", newBalanceInstant.toString());
return newBalance;
}
我有 2 个关于分组和聚合的问题,
一个。 groupByKey
是按Serdes.String()
分组,而jsonSerde
只是对steam 数据进行序列化和反序列化吗? Serdes.String()
是 newRandomTransaction
方法中的名称字符串。
b。我的断言是 aggregation
函数中的 key, transaction
行 (key, transaction, balance) -> newBalance(transaction, balance)
是从 bank-transactions
主题中读取的,而 balance
来自 initialBalance
从上一行。对吗?
我在尝试调试应用程序时也感到困惑,尽管它可以无缝运行。
Is the groupByKey is grouping by the Serdes.String() and the jsonSerde is only performing the serialization and deserialization for the steam data?
是的,groupByKey 是按键进行分组,这些键可以反序列化并作为字符串进行比较
My assertion is the key, transaction inside the aggregation function of the line (key, transaction, balance) -> newBalance(transaction, balance) is read from the bank-transactions topic and the balance is coming from the initialBalance from the previous line
差不多。初始化器在第一个参数上,是的,但是聚合结果在应用程序的整个执行过程中被结转,无休止地聚合。
换句话说,您总是从 initialBalance
开始,然后对于每个相同的密钥,您将 transaction
的余额添加到该密钥当前累积的 balance
.如果你还没有看到密钥重复,那么才会将其添加到初始余额中
是的,您的输入主题是由 KStreams builder.stream
方法指定的