多个消费者使用 spring kafka,回复主题写在 spring 之外

Multiple consumers using spring kafka with reply topic being written outside spring

我正在使用 Spring-Kafka 并尝试实现请求回复模式。我的用例是,客户端使用有效负载调用 Rest 端点,我将此消息发送到 kafka 主题(请求主题)。我有 spark 作业,它使用此消息、处理它并在另一个 kafka 主题 (reply-topic) 中发送响应。将消息写回回复主题后,我的 Web 应用程序应该使用此消息并将 return 作为对客户端的 http 响应。

到目前为止我取得了什么。

我正在使用 Spring-Kafka 来解决这个用例。我能够将请求 body 作为 kafka 消息发送到请求主题。在发送 kafka 消息之前,Spring-Kafka 正在生成一个 kafka_correlationId 作为 kafka header。我已经注册了 producerInterceptor,掌握了生成的 correlationId 并在消息 body.

中传递了它

在 spark 作业中,我能够使用此 kafka 消息,处理它并在 reply-topic 中发回时,我正在添加消息 header kafka_correlationId,其值与什么相同已生成。

当我只有一个消费者时,用例运行得非常好。

什么不起作用。

现在,我已经部署了我的 Web 应用程序的 2 个实例,reply-topic 有 2 个具有相同消费者组 ID 的分区。

App-instance-1 : consuming from partition-0
App-instance-2 : consuming from partition-1

如果我的请求转到 App-instance-1 并且如果我的 spark 作业能够写入 reply-topic 的 partion-0,我就能得到响应。但是,如果 spark 作业在 reply-topic 中写入 partion-1,因为 App-instance-1 仅侦听 partion-0,我无法获得响应并且应用程序因超时异常而失败。另一个应用程序实例的类似情况。

请让我知道我应该配置什么来实现这个。

有两种解法-

  • 还设置 REPLY_PARTITION header(并让 spark 应用程序将回复发送到该分区)- 并静态分配分区。
  • 在每个实例中使用不同的 group.id,以便两个客户端都能得到回复 - 在这种情况下,设置 sharedReplyTopic 以避免当一个实例得到他没有的请求的回复时的日志噪声'发送.

参见 the documentation

When you configure with a single reply TopicPartitionOffset, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition. When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID. This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply. When you use this setting, we recommend that you set the template’s sharedReplyTopic to true, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.

我正在回答我自己的问题,以便它可以帮助其他人。

根据@Gary Russel 的输入,我继续在消息中设置 REPLY_PARTITION header。

我没有将分区静态分配给消费者(因为我不确定我的应用程序中将有多少消费者),而是选择动态识别分配的分区并将其传递到 REPLY_PARTITION header。下面是实现相同目的的代码。

定义侦听 reply-topic.

的 bean
@Bean
public KafkaMessageListenerContainer<String, JsonNode> replyContainer(ConsumerFactory<String, JsonNode> cf) {
    ContainerProperties containerProperties = new ContainerProperties("reply-topic");
    return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

要获得分配的分区,

@Autowired
private KafkaMessageListenerContainer<String, JsonNode> replyContainer;

    /**
     * <P>
     *     gets first assigned partition
     * </P>
     * @return
     */
    private Integer getAssignedPartition() throws Exception {
        Integer partitionId = null;
        if (replyContainer.getAssignedPartitions() != null) {
            for (TopicPartition assignedPartition : replyContainer.getAssignedPartitions()) {
                if(assignedPartition.topic().equalsIgnoreCase("reply-topic")){
                    partitionId = assignedPartition.partition();
                }
            }
        }
        if(partitionId == null){
            //throw exception
        }
        return partitionId;
    }

    /**
     * <P>
     *     int to byte array
     * </P>
     * @param value
     * @return
     */
    private static byte[] toByteArray(int value) {
        return new byte[] {
                (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value
        };
    }

发送等待的方法reply-topic.

/**
 * <P>
 *     Sends message to request topic and wait for reply-topic response.
 * </P>
 * @param request
 * @return
 * @throws Exception
 */
public JsonNode process(JsonNode request) throws Exception {
    // create producer record
    Integer partitionId = getAssignedPartition();
    ProducerRecord<String, JsonNode> record = new ProducerRecord<>(requestTopic, request);
    // set reply topic in header
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, toByteArray(partitionId) ));
    RequestReplyFuture<String, JsonNode, JsonNode> sendAndReceive = kafkaTemplate.sendAndReceive(record);
    
    // get consumer record
    ConsumerRecord<String, JsonNode> consumerRecord = sendAndReceive.get(2, TimeUnit.SECONDS);
    // return consumer value
    return consumerRecord.value();
}