如何在消费消息时访问 Kafka headers?
How to access Kafka headers while consuming a message?
以下是我的配置
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="true"
channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
inputFromKafka
下面经过改造
public Message<?> transform(final Message<?> message) {
System.out.println( "KAFKA Message Headers " + message.getHeaders());
final Map<String, Map<Integer, List<Object>>> origData = (Map<String, Map<Integer, List<Object>>>) message.getPayload();
// some code to figure-out the nonPartitionedData
return MessageBuilder.withPayload(nonPartitionedData).build();
}
上面的打印语句只打印两个一致的 headers 而不管
KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}
在发送 Kafka 消息时,一些 headers 已通过,包括 KafkaHeaders.MESSAGE_KEY
但我也没有回复,想知道是否有办法完成此操作?
不幸的是,它不是那样工作的...
Producer
部分 (KafkaProducerMessageHandler
) 如下所示:
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
如您所见,我们不会向 Kafka topic
发送任何 messageHeaders
。仅 payload
并且恰好在 Kafka 协议指定的 messageKey
之下。
从另一边 Consumer
边 (KafkaHighLevelConsumerMessageSource
) 执行此逻辑:
if (!payloadMap.containsKey(messageAndMetadata.partition())) {
final List<Object> payload = new ArrayList<Object>();
payload.add(messageAndMetadata.message());
payloadMap.put(messageAndMetadata.partition(), payload);
}
如您所见,我们不关心 messageKey
。
KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) 适合你!它在将消息发送到频道之前执行此操作:
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());
if (!this.autoCommitOffset) {
rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
如前所述,Kafka 中没有消息的概念 headers。因为我过去曾遇到过同样的问题,所以我编写了一个 small library 来帮助解决这个问题。它可能会派上用场。
以下是我的配置
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="true"
channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
inputFromKafka
下面经过改造
public Message<?> transform(final Message<?> message) {
System.out.println( "KAFKA Message Headers " + message.getHeaders());
final Map<String, Map<Integer, List<Object>>> origData = (Map<String, Map<Integer, List<Object>>>) message.getPayload();
// some code to figure-out the nonPartitionedData
return MessageBuilder.withPayload(nonPartitionedData).build();
}
上面的打印语句只打印两个一致的 headers 而不管
KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}
在发送 Kafka 消息时,一些 headers 已通过,包括 KafkaHeaders.MESSAGE_KEY
但我也没有回复,想知道是否有办法完成此操作?
不幸的是,它不是那样工作的...
Producer
部分 (KafkaProducerMessageHandler
) 如下所示:
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
如您所见,我们不会向 Kafka topic
发送任何 messageHeaders
。仅 payload
并且恰好在 Kafka 协议指定的 messageKey
之下。
从另一边 Consumer
边 (KafkaHighLevelConsumerMessageSource
) 执行此逻辑:
if (!payloadMap.containsKey(messageAndMetadata.partition())) {
final List<Object> payload = new ArrayList<Object>();
payload.add(messageAndMetadata.message());
payloadMap.put(messageAndMetadata.partition(), payload);
}
如您所见,我们不关心 messageKey
。
KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) 适合你!它在将消息发送到频道之前执行此操作:
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());
if (!this.autoCommitOffset) {
rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
如前所述,Kafka 中没有消息的概念 headers。因为我过去曾遇到过同样的问题,所以我编写了一个 small library 来帮助解决这个问题。它可能会派上用场。