使用 CompletableFuture 在 Flink Sink 中进行异步操作
Async Operation in A Flink Sink using CompletableFuture
背景
计划使用 Flink 设置数据管道。
流程如下所示
Kafka --> Flink Job --> gRPC endpoint
到目前为止的故事
- 阻止实施已启动 运行。但这不会扩展到高 QPS
- 尝试模拟异步行为
问题
- 对于异步行为,不确定行为如何
- 如果使用
CompletableFuture
,每条消息将以 Async
方式处理,但是否会在第一条消息处理完成之前提取下一条消息进行处理?换句话说,有一种方法可以在任务管理器中实现异步处理。但是任务管理器在获取下一条消息/元组时的行为是什么?将等到 Async
过程完成还是提交给 CompletableFuture / Thread 并获取下一条消息?不清楚
- 如果不使用自定义线程池是否会导致任何问题
shutdown
,因为管道将在很长一段时间内 运行?
- 在 Flink sink 中实现异步行为的任何其他解决方案?
我会利用 Flink 对异步运算符的支持,并有一个 DiscardingSink
,而不是尝试实现自定义异步接收器。
不,我看不出任何持久线程池会导致问题的原因。
背景
计划使用 Flink 设置数据管道。
流程如下所示
Kafka --> Flink Job --> gRPC endpoint
到目前为止的故事
- 阻止实施已启动 运行。但这不会扩展到高 QPS
- 尝试模拟异步行为
问题
- 对于异步行为,不确定行为如何
- 如果使用
CompletableFuture
,每条消息将以Async
方式处理,但是否会在第一条消息处理完成之前提取下一条消息进行处理?换句话说,有一种方法可以在任务管理器中实现异步处理。但是任务管理器在获取下一条消息/元组时的行为是什么?将等到Async
过程完成还是提交给 CompletableFuture / Thread 并获取下一条消息?不清楚 - 如果不使用自定义线程池是否会导致任何问题
shutdown
,因为管道将在很长一段时间内 运行? - 在 Flink sink 中实现异步行为的任何其他解决方案?
我会利用 Flink 对异步运算符的支持,并有一个 DiscardingSink
,而不是尝试实现自定义异步接收器。
不,我看不出任何持久线程池会导致问题的原因。