如何使用 kafka 流处理 chunks/batches 中的数据?
how to process data in chunks/batches with kafka streams?
对于大数据中的许多情况,最好一次处理一小块记录缓冲区,而不是一次处理一条记录。
自然的例子是调用一些支持批处理以提高效率的外部 API。
我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。
到目前为止我有:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
在 Scala 和 Akka Streams 中,该函数称为 grouped
或 batch
。在 Spark Structured Streaming 中我们可以做 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.
我怀疑,如果 Kafka 流目前像其他工具一样支持固定大小 windows。
但是有基于时间的 windows,由 kafka 流支持。 https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
您可以用时间定义 window 大小而不是记录数。
- 翻滚时间windows
- 滑动时间window
- 会话 window
- 跳跃时间window
对于您的情况,Tumbling Time Window 可以作为一个选项使用。这些是非重叠的,固定大小的时间 window.
For example, tumbling windows with a size of 5000ms have predictable
window boundaries [0;5000),[5000;10000),... — and not
[1000;6000),[6000;11000),... or even something “random” like
[1452;6452),[6452;11452),....
似乎还不存在。看这个spacehttps://issues.apache.org/jira/browse/KAFKA-7432
您可以使用队列。如下所示,
@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {
public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
super(configuration);
}
@Override
Topology buildTopology() {
KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
// .peek((key, value) -> log.info("message received by stream 0"));
kStream.process(() -> new AbstractProcessor<String, String>() {
final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
final List<String> collection = new ArrayList<>();
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
processQueue();
context().commit();
});
}
@Override
public void process(String key, String value) {
queue.add(value);
if (queue.remainingCapacity() == 0) {
processQueue();
}
}
public void processQueue() {
queue.drainTo(collection);
long count = collection.stream().peek(System.out::println).count();
if (count > 0) {
System.out.println("count is " + count);
collection.clear();
}
}
});
kStream.to("normalTopic1");
return streamsBuilder.build();
}
}
对于大数据中的许多情况,最好一次处理一小块记录缓冲区,而不是一次处理一条记录。
自然的例子是调用一些支持批处理以提高效率的外部 API。
我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。
到目前为止我有:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
在 Scala 和 Akka Streams 中,该函数称为 grouped
或 batch
。在 Spark Structured Streaming 中我们可以做 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.
我怀疑,如果 Kafka 流目前像其他工具一样支持固定大小 windows。
但是有基于时间的 windows,由 kafka 流支持。 https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
您可以用时间定义 window 大小而不是记录数。
- 翻滚时间windows
- 滑动时间window
- 会话 window
- 跳跃时间window
对于您的情况,Tumbling Time Window 可以作为一个选项使用。这些是非重叠的,固定大小的时间 window.
For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... — and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....
似乎还不存在。看这个spacehttps://issues.apache.org/jira/browse/KAFKA-7432
您可以使用队列。如下所示,
@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {
public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
super(configuration);
}
@Override
Topology buildTopology() {
KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
// .peek((key, value) -> log.info("message received by stream 0"));
kStream.process(() -> new AbstractProcessor<String, String>() {
final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
final List<String> collection = new ArrayList<>();
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
processQueue();
context().commit();
});
}
@Override
public void process(String key, String value) {
queue.add(value);
if (queue.remainingCapacity() == 0) {
processQueue();
}
}
public void processQueue() {
queue.drainTo(collection);
long count = collection.stream().peek(System.out::println).count();
if (count > 0) {
System.out.println("count is " + count);
collection.clear();
}
}
});
kStream.to("normalTopic1");
return streamsBuilder.build();
}
}