如何使用 spring 云流仅发送自定义 Header

How to send only custom Header using spring cloud stream

我必须在 kafka 消息中设置自定义 header,我的 kafka 集群原生支持 headers (1.x.x)。我目前正在使用 springCloudVersion=Finchley.RELEASE.

当我设置属性

default:
    producer:
      headerMode: none

None 个 header 将进入输出。

另一方面,如果我设置

default:
    producer:
      headerMode: headers

包括contentTypespring_json_header_types在内的许多header都进入了header,这极大地影响了吞吐量。 Kafka 是一个 language/framework 不可知的消息传递机制,我觉得 spring 应该提供一种只包含 user-provided headers 的方法。是否有任何变通办法只让 user-set headers 进入 kafka 主题,同时抑制所有 spring 云相关 headers.

我已经在 GitHub 上回答了这个问题。在多个地方问同一个问题是浪费你和我们的时间。正如我在那里所说,Stack Overflow 是提问的首选场所。

这里再次给出答案:

请使用Stack Overflow提问; GitHub 问题用于报告错误或要求新功能。

您可以在 binder configuration 中提供自定义 header 映射器。

spring.cloud.stream.kafka.binder.headerMapperBeanName

The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a DefaultKafkaHeaderMapper that uses JSON deserialization for the headers.

Default: none.

实现您自己的 KafkaHeaderMapper 并将其添加为 @Bean 并配置活页夹以使用它。

编辑

DefaultKafkaHeaderMapper 有一个 属性 可以将 byte[] header 转换为 String

/**
 * Set the headers to not perform any conversion on (except {@code String} to
 * {@code byte[]} for outbound). Inbound headers that match will be mapped as
 * {@code byte[]} unless the corresponding boolean in the map value is true,
 * in which case it will be mapped as a String.
 * @param rawMappedHeaders the header names to not convert and
 * @since 2.2.5
 * @see #setCharset(Charset)
 * @see #setMapAllStringsOut(boolean)
 */
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {

我能够删除“spring_json_header_types”header:

  1. 扩展 DefaultKafkaHeaderMapper:

    package com.config.streams;    
    import org.apache.kafka.common.header.Headers;
    import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
    import org.springframework.messaging.MessageHeaders;
    
    public class KafkaHeaderMapperNoSpringHeader extends DefaultKafkaHeaderMapper {
    
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            super.fromHeaders(headers, target);
            target.remove("spring_json_header_types");
        }
    
    }
    
  2. 实例化 bean:

    @Configuration
    public class StreamsConfiguration {
    
           @Bean
           public KafkaHeaderMapper kafkaHeaderMapperNoSpringHeader() {
               return new KafkaHeaderMapperNoSpringHeader();
           } ...
    
  3. 并在application.properties中添加以下属性:

    spring.cloud.stream.kafka.binder.headerMapperBeanName = kafkaHeaderMapperNoSpringHeader