使用 Apache Flink 流处理缓冲转换后的消息(例如,1000 条)
Buffering transformed messages(example, 1000 count) using Apache Flink stream processing
我正在使用 Apache Flink 进行流处理。
订阅来自源(例如:Kafka、AWS Kinesis Data Streams)的消息,然后使用 Flink 运算符对流数据应用转换、聚合等后,我想缓冲最终消息(例如:计数为 1000)和post 每个批次在对外部 REST 的单个请求中 API。
如何在Apache Flink中实现缓冲机制(每1000条记录为一批)?
Flink pipileine: streaming Source --> transform/reduce using Operators --> buffer 1000 messages --> post to REST API
感谢您的帮助!
我会创建一个带有状态的接收器,该接收器会保存传入的消息。当计数足够高 (1000) 时,接收器会发送批处理。状态可以在内存中(例如,包含消息 ArrayList 的实例变量),但您应该使用检查点,以便在出现某种故障时可以恢复该状态。
当您的接收器具有检查点状态时,它需要实现 CheckpointedFunction(在 org.apache.flink.streaming.api.checkpoint 中),这意味着您需要向接收器添加两个方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// HttpSinkStateItem is a user-written class
// that just holds a collection of messages (Strings, in this case)
//
// Buffer is declared as ArrayList<String>
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Mix and match different kinds of states as needed:
// - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
// - types are list and union
// - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
// - types are value, list, reducing, aggregating and map
// - Distinguish between state data using state name (e.g. "HttpSink-State")
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());
}
}
}
如果计数未达到您的阈值,您还可以在接收器中使用计时器(如果输入流是 keyed/partitioned)定期发送。
我正在使用 Apache Flink 进行流处理。
订阅来自源(例如:Kafka、AWS Kinesis Data Streams)的消息,然后使用 Flink 运算符对流数据应用转换、聚合等后,我想缓冲最终消息(例如:计数为 1000)和post 每个批次在对外部 REST 的单个请求中 API。
如何在Apache Flink中实现缓冲机制(每1000条记录为一批)?
Flink pipileine: streaming Source --> transform/reduce using Operators --> buffer 1000 messages --> post to REST API
感谢您的帮助!
我会创建一个带有状态的接收器,该接收器会保存传入的消息。当计数足够高 (1000) 时,接收器会发送批处理。状态可以在内存中(例如,包含消息 ArrayList 的实例变量),但您应该使用检查点,以便在出现某种故障时可以恢复该状态。
当您的接收器具有检查点状态时,它需要实现 CheckpointedFunction(在 org.apache.flink.streaming.api.checkpoint 中),这意味着您需要向接收器添加两个方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// HttpSinkStateItem is a user-written class
// that just holds a collection of messages (Strings, in this case)
//
// Buffer is declared as ArrayList<String>
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Mix and match different kinds of states as needed:
// - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
// - types are list and union
// - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
// - types are value, list, reducing, aggregating and map
// - Distinguish between state data using state name (e.g. "HttpSink-State")
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());
}
}
}
如果计数未达到您的阈值,您还可以在接收器中使用计时器(如果输入流是 keyed/partitioned)定期发送。