如何检查 Ruby-Kafka 重试是否有效?

How to check to see if Ruby-Kafka retries works?

文档中提到生产者根据 max_retries.

重试将消息发送到队列

所以我关闭了 Kafka,然后尝试了我的生产者。我收到此错误

Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)

这是有道理的,但是 retries 之后就再也没有发生过。我已经彻底阅读了该文档,但我无法弄清楚 retries 实际上将如何触发?

这是我的代码:

 def self.deliver_message(kafka, message, topic, transactional_id)
      producer = kafka.producer(idempotent: true,
                                transactional_id: transactional_id,
                                required_acks: :all,
                                max_retries: 5,
                                retry_backoff: 5)
      producer.produce(message, topic: topic)
      producer.deliver_messages
    end

link 到文档:

https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method

提前致谢。

重试基于生产者回调抛出的异常类型。根据 Callback Docs 回调期间可能发生以下异常:

The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:

Non-Retriable exceptions (fatal, the message will never be sent):

  • InvalidTopicException
  • OffsetMetadataTooLargeException
  • RecordBatchTooLargeException
  • RecordTooLargeException
  • UnknownServerException

Retriable exceptions (transient, may be covered by increasing #.retries):

  • CorruptRecordException
  • InchvalidMetadataException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • OffsetOutOfRangeException
  • 超时异常
  • UnknownTopicOrPartitionException

完全关闭 Kafka 看起来像是一个不可重试的异常。