Kafka Headers 作为批处理模式的列表

Kafka Headers as List in batch mode

我正在尝试访问某些 headers,同时以批处理模式使用消息。 如果我设置侦听器来处理 Message<?> 我可以手动提取 headers

@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
  for (int i = 0; i < data.size(); ++) {
    Object message = data.get(i).getPayload();
    MessageHeaders mh = data.get(i).getHeaders();
    Object value = mh.get("test");

我想为我完成其中一些,但是当我尝试时

@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
  @Header (KafkaHeaders.OFFSET) List<Integer> offsets,
  @Header ("test") List<String> testHeaders,
  Acknowledgment ack) throws SQLException {

我明白了 MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list] 但是,此方法适用于偏移 header.

这是因为有内置代码来处理标准 header 并且这种方法不能用于自定义代码,还是我错过了一些可以使这种方法起作用的东西?

是的,框架只映射它知道的 header;它将所有其他映射的 header 放入

/**
 * The header for a list of Maps of converted native Kafka headers. Used for batch
 * listeners; the map at a particular list position corresponds to the data in the
 * payload list position.
 */
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";

如果您想要 header 的离散映射,则需要创建自定义 BatchMessageConverter - 可能是 BatchMessagingMessageConverter.

的子类

可能最简单的方法是覆盖 this method,调用 super.toMessage(),然后添加您的 header。

return MessageBuilder.fromMessage(super.toMessage(...))
    .setHeader("test", ...)
    .build();

如果您正在使用 Spring 引导,只需将转换器作为 bean 添加,引导就会将其连接。

否则将转换器添加到容器工厂。

编辑

如果消息转换器没有header映射器;所有 header 都放在 header KafkaHeaders.NATIVE_HEADERS 中,这是 List<Headers>.