无论如何在 Flink 1.15 中结合 KafkaSource 和 ThrottledIterator?
Is there anyway to combine KafkaSource and ThrottledIterator in Flink 1.15?
有很多关于速率限制/节流 Kafka 消费者的老话题
- Dynamically throttle flink kafka sources
- 等等
但是其中none个可以用在case 1.15:
KafkaFetcher
不暴露emitRecord
- Now it resides里面的
KafkaRecordEmitter
FlinkKafkaConsumer
是 deprecated
KafkaSource
是首选方法
KafkaSource
explicitly creates一个KafkaRecordEmitter
里面是createReader
所以,我的问题是,是否可以将 ThrottledIterator
与 KafkaSource
结合起来?
您不应在主处理线程中应用速率限制,因为这 can/will 会阻止检查点。您应该做的是在反序列化模式中应用速率限制。
您可以在反序列化模式的 open
方法中设置速率限制器,并在 deserialize
方法中获取它。 (但我会为此使用 Guava RateLimiter 而不是节流迭代器。)
有很多关于速率限制/节流 Kafka 消费者的老话题
- Dynamically throttle flink kafka sources
- 等等
但是其中none个可以用在case 1.15:
KafkaFetcher
不暴露emitRecord
- Now it resides里面的
KafkaRecordEmitter
- Now it resides里面的
FlinkKafkaConsumer
是 deprecatedKafkaSource
是首选方法
KafkaSource
explicitly creates一个KafkaRecordEmitter
里面是createReader
所以,我的问题是,是否可以将 ThrottledIterator
与 KafkaSource
结合起来?
您不应在主处理线程中应用速率限制,因为这 can/will 会阻止检查点。您应该做的是在反序列化模式中应用速率限制。
您可以在反序列化模式的 open
方法中设置速率限制器,并在 deserialize
方法中获取它。 (但我会为此使用 Guava RateLimiter 而不是节流迭代器。)