Kafka Headers 作为批处理模式的列表
Kafka Headers as List in batch mode
我正在尝试访问某些 headers,同时以批处理模式使用消息。
如果我设置侦听器来处理 Message<?>
我可以手动提取 headers
@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
for (int i = 0; i < data.size(); ++) {
Object message = data.get(i).getPayload();
MessageHeaders mh = data.get(i).getHeaders();
Object value = mh.get("test");
我想为我完成其中一些,但是当我尝试时
@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
@Header (KafkaHeaders.OFFSET) List<Integer> offsets,
@Header ("test") List<String> testHeaders,
Acknowledgment ack) throws SQLException {
我明白了 MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]
但是,此方法适用于偏移 header.
这是因为有内置代码来处理标准 header 并且这种方法不能用于自定义代码,还是我错过了一些可以使这种方法起作用的东西?
是的,框架只映射它知道的 header;它将所有其他映射的 header 放入
/**
* The header for a list of Maps of converted native Kafka headers. Used for batch
* listeners; the map at a particular list position corresponds to the data in the
* payload list position.
*/
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";
如果您想要 header 的离散映射,则需要创建自定义 BatchMessageConverter
- 可能是 BatchMessagingMessageConverter
.
的子类
可能最简单的方法是覆盖 this method,调用 super.toMessage()
,然后添加您的 header。
return MessageBuilder.fromMessage(super.toMessage(...))
.setHeader("test", ...)
.build();
如果您正在使用 Spring 引导,只需将转换器作为 bean 添加,引导就会将其连接。
否则将转换器添加到容器工厂。
编辑
如果消息转换器没有header映射器;所有 header 都放在 header KafkaHeaders.NATIVE_HEADERS
中,这是 List<Headers>
.
我正在尝试访问某些 headers,同时以批处理模式使用消息。
如果我设置侦听器来处理 Message<?>
我可以手动提取 headers
@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
for (int i = 0; i < data.size(); ++) {
Object message = data.get(i).getPayload();
MessageHeaders mh = data.get(i).getHeaders();
Object value = mh.get("test");
我想为我完成其中一些,但是当我尝试时
@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
@Header (KafkaHeaders.OFFSET) List<Integer> offsets,
@Header ("test") List<String> testHeaders,
Acknowledgment ack) throws SQLException {
我明白了 MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]
但是,此方法适用于偏移 header.
这是因为有内置代码来处理标准 header 并且这种方法不能用于自定义代码,还是我错过了一些可以使这种方法起作用的东西?
是的,框架只映射它知道的 header;它将所有其他映射的 header 放入
/**
* The header for a list of Maps of converted native Kafka headers. Used for batch
* listeners; the map at a particular list position corresponds to the data in the
* payload list position.
*/
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";
如果您想要 header 的离散映射,则需要创建自定义 BatchMessageConverter
- 可能是 BatchMessagingMessageConverter
.
可能最简单的方法是覆盖 this method,调用 super.toMessage()
,然后添加您的 header。
return MessageBuilder.fromMessage(super.toMessage(...))
.setHeader("test", ...)
.build();
如果您正在使用 Spring 引导,只需将转换器作为 bean 添加,引导就会将其连接。
否则将转换器添加到容器工厂。
编辑
如果消息转换器没有header映射器;所有 header 都放在 header KafkaHeaders.NATIVE_HEADERS
中,这是 List<Headers>
.