KafkaHeaders.RECEIVED_MESSAGE_KEY 对比 KafkaHeaders.MESSAGE_KEY header 在 Spring 云上
KafkaHeaders.RECEIVED_MESSAGE_KEY vs KafkaHeaders.MESSAGE_KEY header on Spring Cloud
我正在使用 spring 云流。
我想知道 KafkaHeaders.RECEIVED_MESSAGE_KEY
和 KafkaHeaders.MESSAGE_KEY
之间有什么区别
我有 2 个项目,第一个使用 KafkaHeaders.MESSAGE_KEY
作为 header:
生成消息
public void sendResponse(ThirdPartyResponse thirdPartyResponse) {
log.info("Sending response of type 'completed' [{}].", thirdPartyResponse);
integrations.send(
withPayload(ApplicationSubmissionSuccessPayload.success(thirdPartyResponse))
.setHeader(KafkaHeaders.MESSAGE_KEY, thirdPartyResponse.getData().getApplicationId())
.build());
}
第二个使用 KafkaHeaders.RECEIVED_MESSAGE_KEY
@StreamListener(target = "ofaOut")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String applicationId, @Payload String payload) throws JsonProcessingException {
...
}
但是我得到了这个错误
2020-03-23 16:13:27.924 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.String], failedMessage=GenericMessage [payload=byte[739],
headers={kafka_offset=285, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67c19b7c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME,
kafka_receivedMessageKey=null, kafka_receivedPartitionId=0,
contentType=application/json, kafka_receivedTopic=com.product.foo.ofa.out, kafka_receivedTimestamp=1584715870225, kafka_groupId=aop-foo-kyc}]
缺少 header
Missing header 'kafka_receivedMessageKey'
我该如何解决?
RECEIVED...
设置在入站消息上;另一个是应用程序指定出站消息的键值。
它们是不同的,以避免在应用程序收到消息时发生意外传播并执行某些工作并且 re-publishes 消息发送到另一个主题。
使用 Spring 集成时,headers 会在消息遍历流时自动复制。
出站消息映射器不映射 RECEIVED...
headers,因此它们不会出现在 ProducerRecord
.
中
... kafka_receivedMessageKey=null ...
表示入站记录上的键为空。
要接收空密钥,请使用
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
我正在使用 spring 云流。
我想知道 KafkaHeaders.RECEIVED_MESSAGE_KEY
和 KafkaHeaders.MESSAGE_KEY
我有 2 个项目,第一个使用 KafkaHeaders.MESSAGE_KEY
作为 header:
public void sendResponse(ThirdPartyResponse thirdPartyResponse) {
log.info("Sending response of type 'completed' [{}].", thirdPartyResponse);
integrations.send(
withPayload(ApplicationSubmissionSuccessPayload.success(thirdPartyResponse))
.setHeader(KafkaHeaders.MESSAGE_KEY, thirdPartyResponse.getData().getApplicationId())
.build());
}
第二个使用 KafkaHeaders.RECEIVED_MESSAGE_KEY
@StreamListener(target = "ofaOut")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String applicationId, @Payload String payload) throws JsonProcessingException {
...
}
但是我得到了这个错误
2020-03-23 16:13:27.924 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.String], failedMessage=GenericMessage [payload=byte[739],
headers={kafka_offset=285, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67c19b7c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME,
kafka_receivedMessageKey=null, kafka_receivedPartitionId=0,
contentType=application/json, kafka_receivedTopic=com.product.foo.ofa.out, kafka_receivedTimestamp=1584715870225, kafka_groupId=aop-foo-kyc}]
缺少 header
Missing header 'kafka_receivedMessageKey'
我该如何解决?
RECEIVED...
设置在入站消息上;另一个是应用程序指定出站消息的键值。
它们是不同的,以避免在应用程序收到消息时发生意外传播并执行某些工作并且 re-publishes 消息发送到另一个主题。
使用 Spring 集成时,headers 会在消息遍历流时自动复制。
出站消息映射器不映射 RECEIVED...
headers,因此它们不会出现在 ProducerRecord
.
... kafka_receivedMessageKey=null ...
表示入站记录上的键为空。
要接收空密钥,请使用
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)