Kafka Streams Processor API 按大小和时间进行批处理

Kafka Streams Processor API batching on size and time

正在尝试使用 kafka 流处理器 API 对记录进行批处理。批处理基于大小和时间。比方说,如果批处理大小达到 10 或最后一批处理时间超过 10 秒(大小或最后处理时间,以先到者为准),则调用外部 API 发送批处理并使用 ProcessingContext 提交。

使用punctuate定期检查批次是否可以清除并发送到外部系统。

问题 - 处理器API process方法可以在标点线程执行时被流API调用吗?由于代码是在punctuate 线程中调用commit 是否可以context.commit() commit 尚未被process 方法处理的记录?

难不成标点线程和process方法在不同线程同时执行?如果是这样,那么我有尚未处理的提交记录的代码

public class TestProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;
    private List<String> batchList = new LinkedList<>();
    private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());

    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);

    @Override
    public void init(ProcessorContext context) {
        LOG.info("Calling init method " + context.taskId());
        this.context = context;
        context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
                    10000){
                //call external API
                batchList.clear();
                lastProcessedTime.set(System.currentTimeMillis());
            }
            context.commit();
        });

    }

    @Override
    public void process(String key, String value) {
        batchList.add(value);
        LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
                "storeSize " + batchList.size());
        if(batchList.size() == 10){
            //call external API to send the batch
            batchList.clear();
            lastProcessedTime.set(System.currentTimeMillis());
        }
        context.commit();
    }

    @Override
    public void close() {
        if(batchList.size() > 0){
            //call external API to send the left over records
            batchList.clear();
        }
    }

}

Can the processor API process method be invoked by streams API when the punctuate thread is being executed?

不,这是不可能的,因为 Processor 在单个线程中执行 processpunctuate 方法(两个方法使用同一个线程)。

Is it possible that the punctuate thread and process method being executed at the same time in different threads?

回答是“不可能”,如上文所述。

考虑到每个主题分区都有自己的 class TestProcessor 实例。而不是局部变量 batchListlastProcessedTime 我建议使用像 KeyValueStore 这样的 Kafka 状态存储,这样你的流将是容错的。