窗口化 KTable 上的 Kafka 操作
Kafka Operations on Windowed KTables
我想对窗口化的 KTable 做一些进一步的操作。为了提供一些背景知识,我有一个主题,其数据形式为:{clientId, txTimestamp, txAmount}
。从这个主题,我创建了一个流,由 clientId 分区,底层主题时间戳等于 txTimestamp 事件字段。从这个流开始,我想每 1 小时汇总每个 clientId 的事务数 windows。这是通过类似于以下内容完成的:
CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;
聚合按预期工作,产生的值类似于:
ClientId
Transactions_per_client
windowsStart
WindowEnd
1
12
1
2
2
8
1
2
1
24
2
3
1
19
3
4
我现在想做的是进一步处理这个 table 以添加一个列,该列表示同一客户的 2 个相邻 windows 之间每个客户的交易数量差异。对于之前的 table,应该是这样的:
ClientId
Transactions_per_client
windowsStart
WindowEnd
Deviation
1
12
1
2
0
2
8
1
2
0
1
24
2
3
12
1
19
3
4
-5
实现此目标的最佳方法是什么(使用 kafka 流或 ksql)?我尝试使用用户定义的聚合函数来尝试创建此列,但它不能应用于 KTable,只能应用于 KStream。
仅供将来参考,目前(2022 年 4 月)的官方回答是无法通过 DSL 在 kafka-streams 中完成,因为“Windowed-TABLE 有点像‘死胡同’在 ksqlDB atm 以及 Kafka Streams 中,您不能真正使用 DSL 来进一步处理数据”(在此处的 Confluent 论坛上回答:https://forum.confluent.io/t/aggregations-on-windowed-ktables/4340)。那里的建议是使用处理器 API,这确实可以非常简单地实现。在高级伪代码中,它将是这样的:
topology.addSource(NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY,
timeWindowedDeserializer, LongDeserializer, SOURCE_TOPIC -> the topic with the windowed KTable);
topology.addProcessor(
NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY,
() -> new Aggregator(storeName),
NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY);
StoreBuilder storeBuilder = keyValueStoreBuilder for the timeWindowedSerde and a Long serde for value;
topology.addStateStore(storeBuilder, NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);
topology.addSink(
NAME_OF_SINK_IN_THE_NEW_TOPOLOGY,
sinkTopic,
timeWindowedSerializer,
Serializer for the new structure -> POJO that contains the deviation field,
NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);
上一节中的聚合器是一个 org.apache.kafka.streams.processor.api.Processor 实现,它跟踪它看到的值并能够检索给定键的先前看到的值。
同样,在高层次上,它类似于:
Long previousTransactionAggregate = kvStore.get(previousWindow);
long deviation;
if (previousTransactionAggregate != null) {
deviation = kafkaRecord.value() - previousTransactionAggregate;
} else {
deviation = 0L;
}
kvStore.put(kafkaRecord.key(), kafkaRecord.value());
Record<Windowed<Long>, TransactionPerNumericKey> newRecord =
new Record<>(
kafkaRecord.key(),
new TransactionPerNumericKey(
kafkaRecord.key().key(), kafkaRecord.value(), deviation),
kafkaRecord.timestamp());
context.forward(newRecord);
上一节中的TransactionPerNumericKey是增强型窗口聚合的结构体名称(包含偏差值)
我想对窗口化的 KTable 做一些进一步的操作。为了提供一些背景知识,我有一个主题,其数据形式为:{clientId, txTimestamp, txAmount}
。从这个主题,我创建了一个流,由 clientId 分区,底层主题时间戳等于 txTimestamp 事件字段。从这个流开始,我想每 1 小时汇总每个 clientId 的事务数 windows。这是通过类似于以下内容完成的:
CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;
聚合按预期工作,产生的值类似于:
ClientId | Transactions_per_client | windowsStart | WindowEnd |
---|---|---|---|
1 | 12 | 1 | 2 |
2 | 8 | 1 | 2 |
1 | 24 | 2 | 3 |
1 | 19 | 3 | 4 |
我现在想做的是进一步处理这个 table 以添加一个列,该列表示同一客户的 2 个相邻 windows 之间每个客户的交易数量差异。对于之前的 table,应该是这样的:
ClientId | Transactions_per_client | windowsStart | WindowEnd | Deviation |
---|---|---|---|---|
1 | 12 | 1 | 2 | 0 |
2 | 8 | 1 | 2 | 0 |
1 | 24 | 2 | 3 | 12 |
1 | 19 | 3 | 4 | -5 |
实现此目标的最佳方法是什么(使用 kafka 流或 ksql)?我尝试使用用户定义的聚合函数来尝试创建此列,但它不能应用于 KTable,只能应用于 KStream。
仅供将来参考,目前(2022 年 4 月)的官方回答是无法通过 DSL 在 kafka-streams 中完成,因为“Windowed-TABLE 有点像‘死胡同’在 ksqlDB atm 以及 Kafka Streams 中,您不能真正使用 DSL 来进一步处理数据”(在此处的 Confluent 论坛上回答:https://forum.confluent.io/t/aggregations-on-windowed-ktables/4340)。那里的建议是使用处理器 API,这确实可以非常简单地实现。在高级伪代码中,它将是这样的:
topology.addSource(NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY,
timeWindowedDeserializer, LongDeserializer, SOURCE_TOPIC -> the topic with the windowed KTable);
topology.addProcessor(
NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY,
() -> new Aggregator(storeName),
NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY);
StoreBuilder storeBuilder = keyValueStoreBuilder for the timeWindowedSerde and a Long serde for value;
topology.addStateStore(storeBuilder, NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);
topology.addSink(
NAME_OF_SINK_IN_THE_NEW_TOPOLOGY,
sinkTopic,
timeWindowedSerializer,
Serializer for the new structure -> POJO that contains the deviation field,
NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);
上一节中的聚合器是一个 org.apache.kafka.streams.processor.api.Processor 实现,它跟踪它看到的值并能够检索给定键的先前看到的值。 同样,在高层次上,它类似于:
Long previousTransactionAggregate = kvStore.get(previousWindow);
long deviation;
if (previousTransactionAggregate != null) {
deviation = kafkaRecord.value() - previousTransactionAggregate;
} else {
deviation = 0L;
}
kvStore.put(kafkaRecord.key(), kafkaRecord.value());
Record<Windowed<Long>, TransactionPerNumericKey> newRecord =
new Record<>(
kafkaRecord.key(),
new TransactionPerNumericKey(
kafkaRecord.key().key(), kafkaRecord.value(), deviation),
kafkaRecord.timestamp());
context.forward(newRecord);
上一节中的TransactionPerNumericKey是增强型窗口聚合的结构体名称(包含偏差值)