如何批量处理最大大小的 KStream 或回退到某个时间 window?
How to process a KStream in a batch of max size or fallback to a time window?
我想创建一个基于 Kafka 流的应用程序,它处理一个主题并以大小为 X(即 50)的批次获取消息,但是如果流的流量较低,则在 Y 秒内给我流中的任何内容(即 5)。
因此,我没有一条一条地处理消息,而是处理一个 List[Record]
,其中列表的大小为 50(或更少)。
这是为了使某些 I/O 绑定处理更有效率。
我知道这可以用经典的 Kafka API 来实现,但我一直在寻找一种基于流的实现,它也可以在本地处理偏移量提交,考虑到 errors/failures。
我无法在他的文档中找到任何相关内容,也无法通过四处搜索找到任何相关内容,想知道是否有人可以解决此问题。
最简单的方法可能是使用有状态 transform()
操作。每次你收到一张唱片,你就把它放进商店。当您收到 50 条记录时,您将进行处理、发出输出并从存储中删除记录。
如果您在一定时间内没有阅读限制,要强制处理,您可以注册挂钟标点符号。
@Matthias J. Sax 回答很好,我只想为此添加一个示例,我认为它可能对某些人有用。
假设我们要将传入值组合成以下类型:
public class MultipleValues { private List<String> values; }
要将消息收集到最大大小的批次中,我们需要创建转换器:
public class MultipleValuesTransformer implements Transformer<String, String, KeyValue<String, MultipleValues>> {
private ProcessorContext processorContext;
private String stateStoreName;
private KeyValueStore<String, MultipleValues> keyValueStore;
private Cancellable scheduledPunctuator;
public MultipleValuesTransformer(String stateStoreName) {
this.stateStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
scheduledPunctuator = processorContext.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::doPunctuate);
}
@Override
public KeyValue<String, MultipleValues> transform(String key, String value) {
MultipleValues itemValueFromStore = keyValueStore.get(key);
if (isNull(itemValueFromStore)) {
itemValueFromStore = MultipleValues.builder().values(Collections.singletonList(value)).build();
} else {
List<String> values = new ArrayList<>(itemValueFromStore.getValues());
values.add(value);
itemValueFromStore = itemValueFromStore.toBuilder()
.values(values)
.build();
}
if (itemValueFromStore.getValues().size() >= 50) {
processorContext.forward(key, itemValueFromStore);
keyValueStore.put(key, null);
} else {
keyValueStore.put(key, itemValueFromStore);
}
return null;
}
private void doPunctuate(long timestamp) {
KeyValueIterator<String, MultipleValues> valuesIterator = keyValueStore.all();
while (valuesIterator.hasNext()) {
KeyValue<String, MultipleValues> keyValue = valuesIterator.next();
if (nonNull(keyValue.value)) {
processorContext.forward(keyValue.key, keyValue.value);
keyValueStore.put(keyValue.key, null);
}
}
}
@Override
public void close() {
scheduledPunctuator.cancel();
}
}
我们需要创建键值存储,将其添加到 StreamsBuilder
,并使用 transform
方法
构建 KStream
流
Properties props = new Properties();
...
Serde<MultipleValues> multipleValuesSerge = Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(MultipleValues.class));
StreamsBuilder builder = new StreamsBuilder();
String storeName = "multipleValuesStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, MultipleValues>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), multipleValuesSerge);
builder.addStateStore(storeBuilder);
builder.stream("source", Consumed.with(Serdes.String(), Serdes.String()))
.transform(() -> new MultipleValuesTransformer(storeName), storeName)
.print(Printed.<String, MultipleValues>toSysOut().withLabel("transformedMultipleValues"));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
通过这种方法,我们使用了我们进行聚合的传入密钥。如果您需要不是通过键收集消息,而是通过某些消息的字段收集消息,则需要以下流程来触发 KStream 上的重新平衡(通过使用中间主题):
.selectKey(..)
.through(intermediateTopicName)
.transform( ..)
似乎没有必要使用Processor
s或Transformer
s和transform()
来按计数批处理事件。常规 groupBy()
和 reduce()
/aggregate()
应该可以解决问题:
KeyValueSerde keyValueSerde = new KeyValueSerde(); // simple custom Serde
final AtomicLong batchCount = new AtomicLong(0L);
myKStream
.groupBy((k,v) -> KeyValue.pair(k, batchCount.getAndIncrement() / batchSize),
Grouped.keySerde(keyValueSerde))
.reduce(this::windowReducer) // <-- how you want to aggregate values in batch
.toStream()
.filter((k,v) -> /* pass through full batches only */)
.selectKey((k,v) -> k.key)
...
您还需要为标准 KeyValue 添加简单的 Serde
。
此选项显然仅在您不需要“标点符号”以在超时时发出不完整的批次时才有用。在分布式处理的情况下,它也不保证批处理中元素的顺序。
您还可以将 count 连接到键字符串以形成新键(而不是使用 KeyValue)。这将进一步简化示例(使用 Serdes.String())。
我想创建一个基于 Kafka 流的应用程序,它处理一个主题并以大小为 X(即 50)的批次获取消息,但是如果流的流量较低,则在 Y 秒内给我流中的任何内容(即 5)。
因此,我没有一条一条地处理消息,而是处理一个 List[Record]
,其中列表的大小为 50(或更少)。
这是为了使某些 I/O 绑定处理更有效率。
我知道这可以用经典的 Kafka API 来实现,但我一直在寻找一种基于流的实现,它也可以在本地处理偏移量提交,考虑到 errors/failures。 我无法在他的文档中找到任何相关内容,也无法通过四处搜索找到任何相关内容,想知道是否有人可以解决此问题。
最简单的方法可能是使用有状态 transform()
操作。每次你收到一张唱片,你就把它放进商店。当您收到 50 条记录时,您将进行处理、发出输出并从存储中删除记录。
如果您在一定时间内没有阅读限制,要强制处理,您可以注册挂钟标点符号。
@Matthias J. Sax 回答很好,我只想为此添加一个示例,我认为它可能对某些人有用。 假设我们要将传入值组合成以下类型:
public class MultipleValues { private List<String> values; }
要将消息收集到最大大小的批次中,我们需要创建转换器:
public class MultipleValuesTransformer implements Transformer<String, String, KeyValue<String, MultipleValues>> {
private ProcessorContext processorContext;
private String stateStoreName;
private KeyValueStore<String, MultipleValues> keyValueStore;
private Cancellable scheduledPunctuator;
public MultipleValuesTransformer(String stateStoreName) {
this.stateStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
scheduledPunctuator = processorContext.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::doPunctuate);
}
@Override
public KeyValue<String, MultipleValues> transform(String key, String value) {
MultipleValues itemValueFromStore = keyValueStore.get(key);
if (isNull(itemValueFromStore)) {
itemValueFromStore = MultipleValues.builder().values(Collections.singletonList(value)).build();
} else {
List<String> values = new ArrayList<>(itemValueFromStore.getValues());
values.add(value);
itemValueFromStore = itemValueFromStore.toBuilder()
.values(values)
.build();
}
if (itemValueFromStore.getValues().size() >= 50) {
processorContext.forward(key, itemValueFromStore);
keyValueStore.put(key, null);
} else {
keyValueStore.put(key, itemValueFromStore);
}
return null;
}
private void doPunctuate(long timestamp) {
KeyValueIterator<String, MultipleValues> valuesIterator = keyValueStore.all();
while (valuesIterator.hasNext()) {
KeyValue<String, MultipleValues> keyValue = valuesIterator.next();
if (nonNull(keyValue.value)) {
processorContext.forward(keyValue.key, keyValue.value);
keyValueStore.put(keyValue.key, null);
}
}
}
@Override
public void close() {
scheduledPunctuator.cancel();
}
}
我们需要创建键值存储,将其添加到 StreamsBuilder
,并使用 transform
方法
KStream
流
Properties props = new Properties();
...
Serde<MultipleValues> multipleValuesSerge = Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(MultipleValues.class));
StreamsBuilder builder = new StreamsBuilder();
String storeName = "multipleValuesStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, MultipleValues>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), multipleValuesSerge);
builder.addStateStore(storeBuilder);
builder.stream("source", Consumed.with(Serdes.String(), Serdes.String()))
.transform(() -> new MultipleValuesTransformer(storeName), storeName)
.print(Printed.<String, MultipleValues>toSysOut().withLabel("transformedMultipleValues"));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
通过这种方法,我们使用了我们进行聚合的传入密钥。如果您需要不是通过键收集消息,而是通过某些消息的字段收集消息,则需要以下流程来触发 KStream 上的重新平衡(通过使用中间主题):
.selectKey(..)
.through(intermediateTopicName)
.transform( ..)
似乎没有必要使用Processor
s或Transformer
s和transform()
来按计数批处理事件。常规 groupBy()
和 reduce()
/aggregate()
应该可以解决问题:
KeyValueSerde keyValueSerde = new KeyValueSerde(); // simple custom Serde
final AtomicLong batchCount = new AtomicLong(0L);
myKStream
.groupBy((k,v) -> KeyValue.pair(k, batchCount.getAndIncrement() / batchSize),
Grouped.keySerde(keyValueSerde))
.reduce(this::windowReducer) // <-- how you want to aggregate values in batch
.toStream()
.filter((k,v) -> /* pass through full batches only */)
.selectKey((k,v) -> k.key)
...
您还需要为标准 KeyValueSerde
。
此选项显然仅在您不需要“标点符号”以在超时时发出不完整的批次时才有用。在分布式处理的情况下,它也不保证批处理中元素的顺序。
您还可以将 count 连接到键字符串以形成新键(而不是使用 KeyValue)。这将进一步简化示例(使用 Serdes.String())。