在 Spring Cloud Streams 中将消息发送到 DLQ 之前添加自定义信息

Adding custom info before message is sent to DLQ in Spring Cloud Streams

我正在使用 Spring Cloud Streams 和默认的 Spring 重试机制,仅使用属性。它运作良好,消息被重试,然后转到 DLQ……到目前为止一切都很好。现在问题来了...

我需要在消息从我的服务发送到 DLQ 之前在消息中添加一些自定义信息。它们足够简单,可以帮助我识别失败的消息,而无需触及通用有效负载。

可能我可以添加自定义 headers 或将其包装在已知模型中,在那里我可以检索我需要的信息 - 无论哪种方式我都需要 intercept/modify 消息。

最简单且成本不高的方法是什么?我的意思是,我们使用简单的配置来进行重试,所以 'cost' 我的意思是用其他东西交换配置。无论如何谢谢!

使用 Kafka 绑定器,您可以将 ProducerInterceptor 添加到 kafka 生产者配置 interceptor.classes

/**
 * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
 * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
 * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
 * <p>
 * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
 * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
 * not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
 * same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
 * as expected.
 * <p>
 * Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
 * Most often, it should be the same topic/partition from 'record'.
 * <p>
 * Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
 * <p>
 * Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
 * specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
 * in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
 * previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
 * the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
 * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
 * modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
 * is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
 * or otherwise the client.
 *
 * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
 * @return producer record to send to topic/partition
 */
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

生产者记录包含目标主题名称;你可以 add/remove headers 那里。

目前没有针对 RabbitMQ 活页夹的类似挂钩。如果您正在使用该活页夹,请针对活页夹在 GitHub 上打开新功能问题。