spring kafka 映射 headers 入站和出站的不同
spring kafka map headers differently for inbound vs outbound
我们正在使用 spring-kafka,对于与我们通信但未设置 spring_json_header_types header 的 non-spring 应用,我们指定某些 headers 应该通过将它们添加为 rawMappedHeaders 来映射为字符串。我们通过设置配置为所有活页夹执行此操作:
spring.cloud.stream.kafka.binder.headerMapperBeanName 作为我们设置 rawMappedHeaders 的默认 KafkaHeaderMapper。这适用于我们拥有的生产者和消费者绑定。
但是,当我们向某些 spring 应用程序出站发送数据时,我们实际上不想遗漏那些 rawMappedHeader 字段的 spring_json_header_types 值,因为下游应用程序没有已针对此 non-spring 应用通信进行调整。
有没有一种方法可以通过配置或代码将 headerMapperBeanName 仅应用于所有生产者或仅应用于所有消费者,以便我可以让它们映射 header出站和入站有什么不同?还是有 DefaultKafkaHeaderMapper 本身可以处理的更好的方法?
活页夹级别只有一个 header 映射器。
创建自定义 KafkaHeaderMapper
实现可能是最简单的方法,该实现将入站和出站映射委托给不同的 DefaultKafkaHeaderMapper
。
public interface KafkaHeaderMapper {
/**
* Map from the given {@link MessageHeaders} to the specified target headers.
* @param headers the abstracted MessageHeaders.
* @param target the native target headers.
*/
void fromHeaders(MessageHeaders headers, Headers target);
/**
* Map from the given native headers to a map of headers for the eventual
* {@link MessageHeaders}.
* @param source the native headers.
* @param target the target headers.
*/
void toHeaders(Headers source, Map<String, Object> target);
}
我们正在使用 spring-kafka,对于与我们通信但未设置 spring_json_header_types header 的 non-spring 应用,我们指定某些 headers 应该通过将它们添加为 rawMappedHeaders 来映射为字符串。我们通过设置配置为所有活页夹执行此操作: spring.cloud.stream.kafka.binder.headerMapperBeanName 作为我们设置 rawMappedHeaders 的默认 KafkaHeaderMapper。这适用于我们拥有的生产者和消费者绑定。
但是,当我们向某些 spring 应用程序出站发送数据时,我们实际上不想遗漏那些 rawMappedHeader 字段的 spring_json_header_types 值,因为下游应用程序没有已针对此 non-spring 应用通信进行调整。
有没有一种方法可以通过配置或代码将 headerMapperBeanName 仅应用于所有生产者或仅应用于所有消费者,以便我可以让它们映射 header出站和入站有什么不同?还是有 DefaultKafkaHeaderMapper 本身可以处理的更好的方法?
活页夹级别只有一个 header 映射器。
创建自定义 KafkaHeaderMapper
实现可能是最简单的方法,该实现将入站和出站映射委托给不同的 DefaultKafkaHeaderMapper
。
public interface KafkaHeaderMapper {
/**
* Map from the given {@link MessageHeaders} to the specified target headers.
* @param headers the abstracted MessageHeaders.
* @param target the native target headers.
*/
void fromHeaders(MessageHeaders headers, Headers target);
/**
* Map from the given native headers to a map of headers for the eventual
* {@link MessageHeaders}.
* @param source the native headers.
* @param target the target headers.
*/
void toHeaders(Headers source, Map<String, Object> target);
}