使用 Apache Flink 流处理缓冲转换后的消息(例如,1000 条)

Buffering transformed messages(example, 1000 count) using Apache Flink stream processing

我正在使用 Apache Flink 进行流处理。

订阅来自源(例如:Kafka、AWS Kinesis Data Streams)的消息,然后使用 Flink 运算符对流数据应用转换、聚合等后,我想缓冲最终消息(例如:计数为 1000)和post 每个批次在对外部 REST 的单个请求中 API。

如何在Apache Flink中实现缓冲机制(每1000条记录为一批)?

Flink pipileine: streaming Source --> transform/reduce using Operators --> buffer 1000 messages --> post to REST API

感谢您的帮助!

我会创建一个带有状态的接收器,该接收器会保存传入的消息。当计数足够高 (1000) 时,接收器会发送批处理。状态可以在内存中(例如,包含消息 ArrayList 的实例变量),但您应该使用检查点,以便在出现某种故障时可以恢复该状态。

当您的接收器具有检查点状态时,它需要实现 CheckpointedFunction(在 org.apache.flink.streaming.api.checkpoint 中),这意味着您需要向接收器添加两个方法:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkpointedState.clear();

    // HttpSinkStateItem is a user-written class 
    // that just holds a collection of messages (Strings, in this case)
    //
    // Buffer is declared as ArrayList<String>

    checkpointedState.add(new HttpSinkStateItem(buffer));

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // Mix and match different kinds of states as needed:
    //   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
    //        - types are list and union        
    //   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
    //        - types are value, list, reducing, aggregating and map
    //   - Distinguish between state data using state name (e.g. "HttpSink-State")      

    ListStateDescriptor<HttpSinkStateItem> descriptor =
        new ListStateDescriptor<>(
            "HttpSink-State",
            HttpSinkStateItem.class);

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {

        for (HttpSinkStateItem item: checkpointedState.get()) {
            buffer = new ArrayList<>(item.getPending());  
        }

    }       

}

如果计数未达到您的阈值,您还可以在接收器中使用计时器(如果输入流是 keyed/partitioned)定期发送。