如何将 KStream 聚合到固定大小的列表?
How to aggregate KStream to list of fixed size?
与这个问题类似但略有不同:,我想在将 KStream
发送给消费者之前对来自 KStream
的消息进行批处理。
但是,不应将此下推安排在固定时间 window,而是安排在每个键的固定消息计数阈值上。
首先想到 2 个问题:
1) 自定义 AbstractProcessor
应该如何处理?大致如下:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2) 由于 StateStore
可能会爆炸(如果条目值永远不会达到阈值才能被转发),那么 'garbage-collect' 的最佳方法是什么?我可以制定一个基于时间的计划并删除太旧的密钥...但这看起来非常 DIY 且容易出错。
我想这行得通。应用基于时间的 'garbage collection' 听起来也很合理。是的,使用 Processor API 而不是 DSL 有一些 DIY 的味道——这首先不是 PAPI 的目的(授权用户做任何需要做的事情)。
一些评论:
- 您将需要一个更复杂的数据结构:因为
punctuate()
是基于流时间进度调用的,所以可能会发生两次调用之间的一个键有超过 10 条记录。因此,您需要像 KeyValueIterator<String, List<Message[]>> it = messageStore.all();
这样的东西才能为每个键存储多个批次。
- 我假设您需要微调 punctuate 的时间表,这将很棘手 -- 如果您的时间表太紧,许多批次可能尚未完成,您会浪费 CPU -- 如果您的日程安排太松散,您将需要大量内存,并且您的下游操作员将在您一次发出大量内容时获得大量数据。向下游发送突发数据可能会成为一个问题。
- 扫描整个商店的成本很高——根据批量大小尝试 "sort" 键值对似乎是个好主意。这应该使您只能触摸已完成批处理的键,而不是所有键。也许你可以在内存中保留一个包含已完成批处理的键的列表,并且只对那些进行查找(如果失败,你需要对存储中的所有键进行一次传递以重新创建这个内存列表)。
与这个问题类似但略有不同:KStream
发送给消费者之前对来自 KStream
的消息进行批处理。
但是,不应将此下推安排在固定时间 window,而是安排在每个键的固定消息计数阈值上。
首先想到 2 个问题:
1) 自定义 AbstractProcessor
应该如何处理?大致如下:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2) 由于 StateStore
可能会爆炸(如果条目值永远不会达到阈值才能被转发),那么 'garbage-collect' 的最佳方法是什么?我可以制定一个基于时间的计划并删除太旧的密钥...但这看起来非常 DIY 且容易出错。
我想这行得通。应用基于时间的 'garbage collection' 听起来也很合理。是的,使用 Processor API 而不是 DSL 有一些 DIY 的味道——这首先不是 PAPI 的目的(授权用户做任何需要做的事情)。
一些评论:
- 您将需要一个更复杂的数据结构:因为
punctuate()
是基于流时间进度调用的,所以可能会发生两次调用之间的一个键有超过 10 条记录。因此,您需要像KeyValueIterator<String, List<Message[]>> it = messageStore.all();
这样的东西才能为每个键存储多个批次。 - 我假设您需要微调 punctuate 的时间表,这将很棘手 -- 如果您的时间表太紧,许多批次可能尚未完成,您会浪费 CPU -- 如果您的日程安排太松散,您将需要大量内存,并且您的下游操作员将在您一次发出大量内容时获得大量数据。向下游发送突发数据可能会成为一个问题。
- 扫描整个商店的成本很高——根据批量大小尝试 "sort" 键值对似乎是个好主意。这应该使您只能触摸已完成批处理的键,而不是所有键。也许你可以在内存中保留一个包含已完成批处理的键的列表,并且只对那些进行查找(如果失败,你需要对存储中的所有键进行一次传递以重新创建这个内存列表)。