如何检查 Ruby-Kafka 重试是否有效?
How to check to see if Ruby-Kafka retries works?
文档中提到生产者根据 max_retries
.
重试将消息发送到队列
所以我关闭了 Kafka,然后尝试了我的生产者。我收到此错误
Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)
这是有道理的,但是 retries
之后就再也没有发生过。我已经彻底阅读了该文档,但我无法弄清楚 retries
实际上将如何触发?
这是我的代码:
def self.deliver_message(kafka, message, topic, transactional_id)
producer = kafka.producer(idempotent: true,
transactional_id: transactional_id,
required_acks: :all,
max_retries: 5,
retry_backoff: 5)
producer.produce(message, topic: topic)
producer.deliver_messages
end
link 到文档:
https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method
提前致谢。
重试基于生产者回调抛出的异常类型。根据 Callback Docs 回调期间可能发生以下异常:
The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:
Non-Retriable exceptions (fatal, the message will never be sent):
- InvalidTopicException
- OffsetMetadataTooLargeException
- RecordBatchTooLargeException
- RecordTooLargeException
- UnknownServerException
Retriable exceptions (transient, may be covered by increasing #.retries):
- CorruptRecordException
- InchvalidMetadataException
- NotEnoughReplicasAfterAppendException
- NotEnoughReplicasException
- OffsetOutOfRangeException
- 超时异常
- UnknownTopicOrPartitionException
完全关闭 Kafka 看起来像是一个不可重试的异常。
文档中提到生产者根据 max_retries
.
所以我关闭了 Kafka,然后尝试了我的生产者。我收到此错误
Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)
这是有道理的,但是 retries
之后就再也没有发生过。我已经彻底阅读了该文档,但我无法弄清楚 retries
实际上将如何触发?
这是我的代码:
def self.deliver_message(kafka, message, topic, transactional_id)
producer = kafka.producer(idempotent: true,
transactional_id: transactional_id,
required_acks: :all,
max_retries: 5,
retry_backoff: 5)
producer.produce(message, topic: topic)
producer.deliver_messages
end
link 到文档:
https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method
提前致谢。
重试基于生产者回调抛出的异常类型。根据 Callback Docs 回调期间可能发生以下异常:
The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:
Non-Retriable exceptions (fatal, the message will never be sent):
- InvalidTopicException
- OffsetMetadataTooLargeException
- RecordBatchTooLargeException
- RecordTooLargeException
- UnknownServerException
Retriable exceptions (transient, may be covered by increasing #.retries):
- CorruptRecordException
- InchvalidMetadataException
- NotEnoughReplicasAfterAppendException
- NotEnoughReplicasException
- OffsetOutOfRangeException
- 超时异常
- UnknownTopicOrPartitionException
完全关闭 Kafka 看起来像是一个不可重试的异常。