使用 Kafka 流从多个主题中累积事件
Accumulate Events from Multiple Topics using Kafka Streams
如果这是一个愚蠢的问题,我深表歉意。
我有一个场景,其中我有来自上游服务的 3 个主题(未键控)。不幸的是,我无法更改这 3 个主题的行为。
上游服务在一天结束时批量发布所有消息,我需要获得交易的累积视图,因为交易的顺序对下游服务很重要。
我知道我无法对主题的不同分区中的消息重新排序,所以我想我是否可以累积它们,然后我的服务可以获取累积的结果并在处理之前重新排序。
但是,我注意到一个奇怪的行为,我希望有人能澄清我遗漏了什么。
当我对 1 到 500 个帐户进行操作时,我看到在输出主题中累积并显示了 500 条消息。
但是,当我对 10,000 个帐户尝试相同的操作时,我看到的输出比应有的多。 (关于输出主题的 13,000 条消息)。
KStream<String, TransactionAccumulator> transactions =
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(k, v) -> v.getAccountId(),
with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
(aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
Materialized.with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.toStream((key, value) -> key.key());
如前所述,上游服务在一天结束时(而不是实时)批量发布所有事件。
感谢我在这里遗漏的内容,因为对于较小的卷,它似乎有效。
更新 1
我尝试了使用 suppression 提出的建议,尝试只发送最后的 window。
然而,使用这个时,它基本上不会向输出主题发布任何消息,虽然我看到 "KTABLE-SUPPRESS-STATE-STORE"
中有消息
更新后的抑制代码如下。
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(key, value) -> value.getAccountId(),
Grouped.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMinutes(1)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.mapValues(
value -> {
LOGGER.info(
"Sending {} Transactions for {}",
value.getTransactions().size(),
value.getAccountId());
return value;
})
.toStream((key, value) -> key.key());
我也没有看到介绍的日志信息。为了清楚起见,我在这个实验中使用 Spring Cloud Stream,我在 stream-app 上看到的最终日志条目如下。
INFO 23436 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [StreamConsumer-consume-applicationId-de25a238-5f0f-4d84-9bd2-3e7b01b7f0b3] State transition from REBALANCING to RUNNING
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
抱歉,我还不能发表评论,但这是我的两分钱:
KGroupedStream.aggregate()
:Kafka Stream 使用 record cache 来控制聚合更新从 aggregate
的物化视图(或 KTable)发送到状态存储和下游处理器的速率。例如消息:
("word1", 4)
("word1", 2)
("word2", 3)
("word1", 1)
以及您的字数统计拓扑:
wordCntPerSentenceKStream
.groupByKey()
.aggregate(() -> 0, (word, newWordCnt, aggsWordCnt) -> aggsWordCnt + newWordCnt, Materialized.as("word-cnt-store").withValueSerde(Serdes.Integer())
.toStream();
您可能会收到这样的下游消息:
("word1", 6)
("word2", 3)
("word1", 7)
所以我的猜测是您的输入主题可能包含单个 AccountId 的多个事务,并且当缓存 (cache.max.bytes.buffering
) 已满或满足 commit.interval.ms
时刷新记录缓存。
- 如果你的接收器是幂等的,你可以用新的消息键覆盖你的
TransactionAccumulator
,或者你可以使用KTable.suppress()
,如here所述,只发出消息的最后一条消息汇总 window.
如果这是一个愚蠢的问题,我深表歉意。
我有一个场景,其中我有来自上游服务的 3 个主题(未键控)。不幸的是,我无法更改这 3 个主题的行为。
上游服务在一天结束时批量发布所有消息,我需要获得交易的累积视图,因为交易的顺序对下游服务很重要。
我知道我无法对主题的不同分区中的消息重新排序,所以我想我是否可以累积它们,然后我的服务可以获取累积的结果并在处理之前重新排序。
但是,我注意到一个奇怪的行为,我希望有人能澄清我遗漏了什么。
当我对 1 到 500 个帐户进行操作时,我看到在输出主题中累积并显示了 500 条消息。
但是,当我对 10,000 个帐户尝试相同的操作时,我看到的输出比应有的多。 (关于输出主题的 13,000 条消息)。
KStream<String, TransactionAccumulator> transactions =
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(k, v) -> v.getAccountId(),
with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
(aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
Materialized.with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.toStream((key, value) -> key.key());
如前所述,上游服务在一天结束时(而不是实时)批量发布所有事件。
感谢我在这里遗漏的内容,因为对于较小的卷,它似乎有效。
更新 1
我尝试了使用 suppression 提出的建议,尝试只发送最后的 window。
然而,使用这个时,它基本上不会向输出主题发布任何消息,虽然我看到 "KTABLE-SUPPRESS-STATE-STORE"
中有消息更新后的抑制代码如下。
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(key, value) -> value.getAccountId(),
Grouped.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMinutes(1)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.mapValues(
value -> {
LOGGER.info(
"Sending {} Transactions for {}",
value.getTransactions().size(),
value.getAccountId());
return value;
})
.toStream((key, value) -> key.key());
我也没有看到介绍的日志信息。为了清楚起见,我在这个实验中使用 Spring Cloud Stream,我在 stream-app 上看到的最终日志条目如下。
INFO 23436 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [StreamConsumer-consume-applicationId-de25a238-5f0f-4d84-9bd2-3e7b01b7f0b3] State transition from REBALANCING to RUNNING
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
抱歉,我还不能发表评论,但这是我的两分钱:
KGroupedStream.aggregate()
:Kafka Stream 使用 record cache 来控制聚合更新从aggregate
的物化视图(或 KTable)发送到状态存储和下游处理器的速率。例如消息:
("word1", 4)
("word1", 2)
("word2", 3)
("word1", 1)
以及您的字数统计拓扑:
wordCntPerSentenceKStream
.groupByKey()
.aggregate(() -> 0, (word, newWordCnt, aggsWordCnt) -> aggsWordCnt + newWordCnt, Materialized.as("word-cnt-store").withValueSerde(Serdes.Integer())
.toStream();
您可能会收到这样的下游消息:
("word1", 6)
("word2", 3)
("word1", 7)
所以我的猜测是您的输入主题可能包含单个 AccountId 的多个事务,并且当缓存 (cache.max.bytes.buffering
) 已满或满足 commit.interval.ms
时刷新记录缓存。
- 如果你的接收器是幂等的,你可以用新的消息键覆盖你的
TransactionAccumulator
,或者你可以使用KTable.suppress()
,如here所述,只发出消息的最后一条消息汇总 window.