Kafka Streams Processor API 按大小和时间进行批处理
Kafka Streams Processor API batching on size and time
正在尝试使用 kafka 流处理器 API 对记录进行批处理。批处理基于大小和时间。比方说,如果批处理大小达到 10 或最后一批处理时间超过 10 秒(大小或最后处理时间,以先到者为准),则调用外部 API 发送批处理并使用 ProcessingContext 提交。
使用punctuate
定期检查批次是否可以清除并发送到外部系统。
问题 - 处理器API process
方法可以在标点线程执行时被流API调用吗?由于代码是在punctuate 线程中调用commit 是否可以context.commit()
commit 尚未被process 方法处理的记录?
难不成标点线程和process方法在不同线程同时执行?如果是这样,那么我有尚未处理的提交记录的代码
public class TestProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
private List<String> batchList = new LinkedList<>();
private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
@Override
public void init(ProcessorContext context) {
LOG.info("Calling init method " + context.taskId());
this.context = context;
context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
10000){
//call external API
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
});
}
@Override
public void process(String key, String value) {
batchList.add(value);
LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
"storeSize " + batchList.size());
if(batchList.size() == 10){
//call external API to send the batch
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
}
@Override
public void close() {
if(batchList.size() > 0){
//call external API to send the left over records
batchList.clear();
}
}
}
Can the processor API process
method be invoked by streams API when
the punctuate
thread is being executed?
不,这是不可能的,因为 Processor
在单个线程中执行 process
和 punctuate
方法(两个方法使用同一个线程)。
Is it possible that the punctuate thread and process method being
executed at the same time in different threads?
回答是“不可能”,如上文所述。
考虑到每个主题分区都有自己的 class TestProcessor
实例。而不是局部变量 batchList
和 lastProcessedTime
我建议使用像 KeyValueStore
这样的 Kafka 状态存储,这样你的流将是容错的。
正在尝试使用 kafka 流处理器 API 对记录进行批处理。批处理基于大小和时间。比方说,如果批处理大小达到 10 或最后一批处理时间超过 10 秒(大小或最后处理时间,以先到者为准),则调用外部 API 发送批处理并使用 ProcessingContext 提交。
使用punctuate
定期检查批次是否可以清除并发送到外部系统。
问题 - 处理器API process
方法可以在标点线程执行时被流API调用吗?由于代码是在punctuate 线程中调用commit 是否可以context.commit()
commit 尚未被process 方法处理的记录?
难不成标点线程和process方法在不同线程同时执行?如果是这样,那么我有尚未处理的提交记录的代码
public class TestProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
private List<String> batchList = new LinkedList<>();
private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
@Override
public void init(ProcessorContext context) {
LOG.info("Calling init method " + context.taskId());
this.context = context;
context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
10000){
//call external API
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
});
}
@Override
public void process(String key, String value) {
batchList.add(value);
LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
"storeSize " + batchList.size());
if(batchList.size() == 10){
//call external API to send the batch
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
}
@Override
public void close() {
if(batchList.size() > 0){
//call external API to send the left over records
batchList.clear();
}
}
}
Can the processor API
process
method be invoked by streams API when thepunctuate
thread is being executed?
不,这是不可能的,因为 Processor
在单个线程中执行 process
和 punctuate
方法(两个方法使用同一个线程)。
Is it possible that the punctuate thread and process method being executed at the same time in different threads?
回答是“不可能”,如上文所述。
考虑到每个主题分区都有自己的 class TestProcessor
实例。而不是局部变量 batchList
和 lastProcessedTime
我建议使用像 KeyValueStore
这样的 Kafka 状态存储,这样你的流将是容错的。