Header Kafka 中的关联 ID

Correlation ID in Header Kafka

我正在为 kafka 使用 kafka conluent c# 客户端,并希望实现 Req-Resp 模式。 我在请求中设置 Reply-to 和关联 ID header (https://github.com/nestjs/nest/blob/master/packages/microservices/enums/kafka-headers.enum.ts) CORRELATION_ID/REPLY_TOPIC。

我在“服务器”端收到这些 header 并回复到 REPLY_TOPIC header 的价值主题。 我可能应该在响应 header 时设置 CORRELATION_ID。

所以问题是:

  1. kafka 内部是否有任何魔法可以在消息 header 上查找 CORRELATION_ID 并通过适当的设置推送给消费者?
  2. MB 有人知道并指出我如何 spring-kafka 实施 req-resp 模式?

Kafka 没有内置任何东西可以做到这一点; spring-kafka 出于这些目的使用自己的自定义 headers。参见 KafkaHeaders

如果您在服务器端 (@KafkaListener) 使用 Spring,则需要设置 headers。然后,框架将相关 ID 回显到回复中,并将主题用作目标。

如果您正在编写自己的服务器代码,您也需要这样做。

    /**
     * The prefix for Kafka headers.
     */
    public static final String PREFIX = "kafka_";

...

    /**
     * The header containing information to correlate requests/replies.
     * Type: byte[].
     * @since 2.1.3
     */
    public static final String CORRELATION_ID = PREFIX + "correlationId";

    /**
     * The header containing the default reply topic.
     * Type: byte[].
     * @since 2.1.3
     */
    public static final String REPLY_TOPIC = PREFIX + "replyTopic";