KStream 批处理 windows
KStream batch process windows
我想用KStream接口批量发送消息
我有一个 Keys/values 的流
我试图在翻滚中收集它们 window 然后我想一次处理完整的 window。
builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {
问题是每次更新 KTable 时都会调用 foreach。
我想在完成后处理整个 window。就像从 100 毫秒开始收集数据然后立即处理一样。每个人都在。
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6
在某些时候,新的 window 从地图中的 1 个条目开始。
所以我什至不知道window什么时候满了。
关于在 kafka 流中进行批处理的任何提示
现在(从 Kafka 0.10.0.0 / 0.10.0.1 开始):您描述的窗口行为是 "working as expected"。也就是说,如果您收到 1,000 条传入消息,您将(当前)始终看到 1,000 条更新通过最新版本的 Kafka / Kafka Streams 向下游传输。
展望未来:Kafka 社区正在开发新功能,以使这种更新率行为更加灵活(例如,允许您将上述描述作为您想要的行为)。有关详细信息,请参阅 KIP-63: Unify store and downstream caching in streams。
======更新======
经进一步测试,这不起作用。
正确的方法是使用 @friedrich-nietzsche 概述的处理器。我对自己的答案投反对票.... grrrr.
===================
我还在纠结这个 API(但我喜欢它,所以时间花得值:)),我不确定你在代码示例结束的下游想要完成什么,但它看起来与我的工作相似。高级别是:
从源读取对象。它代表一个键和 1:∞ 个事件,我想每 5 秒发布每个键的事件总数(或 TP5s,每 5 秒的事务数)。代码的开头看起来一样,但我使用:
- KStreamBuilder.stream
- reduceByKey
- 到 window(5000)
- 到 new stream 每 5 秒获取每个键的累积值。
- map that stream to a new KeyValue 每个键
- to 接收器主题。
在我的例子中,每个 window 周期,我可以将所有事件减少到每个键一个事件,所以这是可行的。如果您想保留每个 window 的所有单个事件,我假设可以使用 reduce 将每个实例映射到实例集合(可能使用相同的键,或者您可能需要一个新键)并在最后在每个 window 期间,下游流将一次获得一堆您的事件集合(或者可能只是所有事件的一个集合)。它看起来像这样,经过消毒和 Java 7-ish:
builder.stream(STRING_SERDE, EVENT_SERDE, SOURCE_TOPICS)
.reduceByKey(eventReducer, TimeWindows.of("EventMeterAccumulator", 5000), STRING_SERDE, EVENT_SERDE)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Event, KeyValue<String,Event>>() {
public KeyValue<String, Event> apply(final Windowed<String> key, final Event finalEvent) {
return new KeyValue<String, Event>(key.key(), new Event(key.window().end(), finalEvent.getCount());
}
}).to(STRING_SERDE, EVENT_SERDE, SINK_TOPIC);
我的实际任务是将更新从流推送到redis,但我不想单独读取/更新/写入,即使redis 很快。
我现在的解决方案是使用 KStream.process() 提供一个处理器,该处理器将添加到进程中的队列并实际处理队列。
public class BatchedProcessor extends AbstractProcessor{
...
BatchedProcessor(Writer writer, long schedulePeriodic)
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(schedulePeriodic);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
writer.processQueue();
context().commit();
}
@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
writer.addToQueue(intentUpdateEvent);
}
我仍然需要测试,但它解决了我遇到的问题。人们可以很容易地以一种非常通用的方式编写这样一个处理器。 API 非常整洁干净,但是 processBatched((List batchedMessaages)-> ..., timeInterval OR countInterval) 只使用 punctuate 来处理批处理并在此时提交并在 Store 中收集 KeyValues 可能是有用的补充。
但也许它的目的是用处理器解决这个问题并保持 API 一次只在一条消息中保持低延迟焦点。
我想用KStream接口批量发送消息
我有一个 Keys/values 的流 我试图在翻滚中收集它们 window 然后我想一次处理完整的 window。
builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {
问题是每次更新 KTable 时都会调用 foreach。 我想在完成后处理整个 window。就像从 100 毫秒开始收集数据然后立即处理一样。每个人都在。
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6
在某些时候,新的 window 从地图中的 1 个条目开始。 所以我什至不知道window什么时候满了。
关于在 kafka 流中进行批处理的任何提示
现在(从 Kafka 0.10.0.0 / 0.10.0.1 开始):您描述的窗口行为是 "working as expected"。也就是说,如果您收到 1,000 条传入消息,您将(当前)始终看到 1,000 条更新通过最新版本的 Kafka / Kafka Streams 向下游传输。
展望未来:Kafka 社区正在开发新功能,以使这种更新率行为更加灵活(例如,允许您将上述描述作为您想要的行为)。有关详细信息,请参阅 KIP-63: Unify store and downstream caching in streams。
======更新======
经进一步测试,这不起作用。 正确的方法是使用 @friedrich-nietzsche 概述的处理器。我对自己的答案投反对票.... grrrr.
===================
我还在纠结这个 API(但我喜欢它,所以时间花得值:)),我不确定你在代码示例结束的下游想要完成什么,但它看起来与我的工作相似。高级别是:
从源读取对象。它代表一个键和 1:∞ 个事件,我想每 5 秒发布每个键的事件总数(或 TP5s,每 5 秒的事务数)。代码的开头看起来一样,但我使用:
- KStreamBuilder.stream
- reduceByKey
- 到 window(5000)
- 到 new stream 每 5 秒获取每个键的累积值。
- map that stream to a new KeyValue 每个键
- to 接收器主题。
在我的例子中,每个 window 周期,我可以将所有事件减少到每个键一个事件,所以这是可行的。如果您想保留每个 window 的所有单个事件,我假设可以使用 reduce 将每个实例映射到实例集合(可能使用相同的键,或者您可能需要一个新键)并在最后在每个 window 期间,下游流将一次获得一堆您的事件集合(或者可能只是所有事件的一个集合)。它看起来像这样,经过消毒和 Java 7-ish:
builder.stream(STRING_SERDE, EVENT_SERDE, SOURCE_TOPICS)
.reduceByKey(eventReducer, TimeWindows.of("EventMeterAccumulator", 5000), STRING_SERDE, EVENT_SERDE)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Event, KeyValue<String,Event>>() {
public KeyValue<String, Event> apply(final Windowed<String> key, final Event finalEvent) {
return new KeyValue<String, Event>(key.key(), new Event(key.window().end(), finalEvent.getCount());
}
}).to(STRING_SERDE, EVENT_SERDE, SINK_TOPIC);
我的实际任务是将更新从流推送到redis,但我不想单独读取/更新/写入,即使redis 很快。 我现在的解决方案是使用 KStream.process() 提供一个处理器,该处理器将添加到进程中的队列并实际处理队列。
public class BatchedProcessor extends AbstractProcessor{
...
BatchedProcessor(Writer writer, long schedulePeriodic)
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(schedulePeriodic);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
writer.processQueue();
context().commit();
}
@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
writer.addToQueue(intentUpdateEvent);
}
我仍然需要测试,但它解决了我遇到的问题。人们可以很容易地以一种非常通用的方式编写这样一个处理器。 API 非常整洁干净,但是 processBatched((List batchedMessaages)-> ..., timeInterval OR countInterval) 只使用 punctuate 来处理批处理并在此时提交并在 Store 中收集 KeyValues 可能是有用的补充。
但也许它的目的是用处理器解决这个问题并保持 API 一次只在一条消息中保持低延迟焦点。