如何使用 spring 云流仅发送自定义 Header
How to send only custom Header using spring cloud stream
我必须在 kafka 消息中设置自定义 header,我的 kafka 集群原生支持 headers (1.x.x)。我目前正在使用 springCloudVersion=Finchley.RELEASE.
当我设置属性
default:
producer:
headerMode: none
None 个 header 将进入输出。
另一方面,如果我设置
default:
producer:
headerMode: headers
包括contentType
和spring_json_header_types
在内的许多header都进入了header,这极大地影响了吞吐量。 Kafka 是一个 language/framework 不可知的消息传递机制,我觉得 spring 应该提供一种只包含 user-provided headers 的方法。是否有任何变通办法只让 user-set headers 进入 kafka 主题,同时抑制所有 spring 云相关 headers.
我已经在 GitHub 上回答了这个问题。在多个地方问同一个问题是浪费你和我们的时间。正如我在那里所说,Stack Overflow 是提问的首选场所。
这里再次给出答案:
请使用Stack Overflow提问; GitHub 问题用于报告错误或要求新功能。
您可以在 binder configuration 中提供自定义 header 映射器。
spring.cloud.stream.kafka.binder.headerMapperBeanName
The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a DefaultKafkaHeaderMapper that uses JSON deserialization for the headers.
Default: none.
实现您自己的 KafkaHeaderMapper
并将其添加为 @Bean
并配置活页夹以使用它。
编辑
DefaultKafkaHeaderMapper
有一个 属性 可以将 byte[]
header 转换为 String
。
/**
* Set the headers to not perform any conversion on (except {@code String} to
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
* {@code byte[]} unless the corresponding boolean in the map value is true,
* in which case it will be mapped as a String.
* @param rawMappedHeaders the header names to not convert and
* @since 2.2.5
* @see #setCharset(Charset)
* @see #setMapAllStringsOut(boolean)
*/
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {
我能够删除“spring_json_header_types”header:
扩展 DefaultKafkaHeaderMapper:
package com.config.streams;
import org.apache.kafka.common.header.Headers;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.messaging.MessageHeaders;
public class KafkaHeaderMapperNoSpringHeader extends DefaultKafkaHeaderMapper {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
super.fromHeaders(headers, target);
target.remove("spring_json_header_types");
}
}
实例化 bean:
@Configuration
public class StreamsConfiguration {
@Bean
public KafkaHeaderMapper kafkaHeaderMapperNoSpringHeader() {
return new KafkaHeaderMapperNoSpringHeader();
} ...
并在application.properties中添加以下属性:
spring.cloud.stream.kafka.binder.headerMapperBeanName = kafkaHeaderMapperNoSpringHeader
我必须在 kafka 消息中设置自定义 header,我的 kafka 集群原生支持 headers (1.x.x)。我目前正在使用 springCloudVersion=Finchley.RELEASE.
当我设置属性
default:
producer:
headerMode: none
None 个 header 将进入输出。
另一方面,如果我设置
default:
producer:
headerMode: headers
包括contentType
和spring_json_header_types
在内的许多header都进入了header,这极大地影响了吞吐量。 Kafka 是一个 language/framework 不可知的消息传递机制,我觉得 spring 应该提供一种只包含 user-provided headers 的方法。是否有任何变通办法只让 user-set headers 进入 kafka 主题,同时抑制所有 spring 云相关 headers.
我已经在 GitHub 上回答了这个问题。在多个地方问同一个问题是浪费你和我们的时间。正如我在那里所说,Stack Overflow 是提问的首选场所。
这里再次给出答案:
请使用Stack Overflow提问; GitHub 问题用于报告错误或要求新功能。
您可以在 binder configuration 中提供自定义 header 映射器。
spring.cloud.stream.kafka.binder.headerMapperBeanName
The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a DefaultKafkaHeaderMapper that uses JSON deserialization for the headers.
Default: none.
实现您自己的 KafkaHeaderMapper
并将其添加为 @Bean
并配置活页夹以使用它。
编辑
DefaultKafkaHeaderMapper
有一个 属性 可以将 byte[]
header 转换为 String
。
/**
* Set the headers to not perform any conversion on (except {@code String} to
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
* {@code byte[]} unless the corresponding boolean in the map value is true,
* in which case it will be mapped as a String.
* @param rawMappedHeaders the header names to not convert and
* @since 2.2.5
* @see #setCharset(Charset)
* @see #setMapAllStringsOut(boolean)
*/
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {
我能够删除“spring_json_header_types”header:
扩展 DefaultKafkaHeaderMapper:
package com.config.streams; import org.apache.kafka.common.header.Headers; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.messaging.MessageHeaders; public class KafkaHeaderMapperNoSpringHeader extends DefaultKafkaHeaderMapper { @Override public void fromHeaders(MessageHeaders headers, Headers target) { super.fromHeaders(headers, target); target.remove("spring_json_header_types"); } }
实例化 bean:
@Configuration public class StreamsConfiguration { @Bean public KafkaHeaderMapper kafkaHeaderMapperNoSpringHeader() { return new KafkaHeaderMapperNoSpringHeader(); } ...
并在application.properties中添加以下属性:
spring.cloud.stream.kafka.binder.headerMapperBeanName = kafkaHeaderMapperNoSpringHeader