在 Reactor Kafka 中使用基于分区的排序进行并发处理

Concurrent processing with partition based ordering in Reactor Kafka

我正在开发一个示例应用程序,它将从 Kafka 主题的不同分区读取数据,同时处理根据分区排序的记录,并将记录写入另一个主题的不同分区。 这是我写的示例代码

    public class MetricsTransposer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    static abstract class SetKafkaProperties {
        final String SOURCE_TOPIC;
        final String DESTINATION_TOPIC;
        final Map<String, Object> consumerProps;
        final Map<String, Object> producerProps;

        SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            SOURCE_TOPIC = sourceTopic;
            DESTINATION_TOPIC = destTopic;

            consumerProps = new HashMap<String, Object>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); 
            if(consumerPropsOverride != null) {
                consumerProps.putAll(consumerPropsOverride);
            }

            producerProps = new HashMap<String, Object>();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
            producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            if(producerPropsOverride != null) {
                producerProps.putAll(producerPropsOverride);
            }
        }
    }

    static class ReactiveTranspose extends SetKafkaProperties {

        SenderOptions<Integer, String> senderOptions =
            SenderOptions.<Integer, String>create(producerProps)
                .maxInFlight(1024);

        KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

        ReceiverOptions<Integer, String> receiverOptions =
            ReceiverOptions.<Integer, String>create(consumerProps)
                .subscription(Collections.singleton(SOURCE_TOPIC));


        ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
        }

        public Disposable ReadProcessWriteRecords() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
           return KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                    partitionFlux.publishOn(scheduler)
                        .map(r -> processRecord(partitionFlux.key(), r))
                        .sample(Duration.ofMillis(5000))
                        .concatMap(offset -> offset.commit()))
                .subscribe();
        }

        private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
            System.out.printf("Processing record {} from partition {} in thread{}",
                message.value(), topicPartition, Thread.currentThread().getName());
            return message.receiverOffset();
        }
    }

    public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
        ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);
        transpose.ReadProcessWriteRecords();
    }

    public static void main(String[] args) throws Exception {
        String sourceTopic = "metrics";
        String destinationTopic = "cleanmetrics";

        RunReactiveTranformProcess(sourceTopic, destinationTopic);

    }
}

当我 运行 应用程序时,我没有在控制台中看到打印语句。我确实有要在主题中使用的数据。所以我想知道代码是否与主题相关。我正在寻求帮助以弄清楚如何检查它是否连接到主题并阅读消息或者这里可能是什么问题。

我是 Java、响应式编程和 Kafka 的新手。这是一个自学项目,很可能我遗漏了一些简单明了的东西。

更多信息: 这是我的日志快照。我有一个名为 metrics 的主题,有 3 个分区

更新:我没有看到打印语句,因为我的主题中有数据,但 auto.offset.reset 设置为最新。将其更改为最早使用现有数据。

你的问题在这里:

public void ReadProcessWriteRecords() {
    Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");

    // Here you are ignoring the return
    // Nothing happens until you subscribe
    // So this is merly a statement not a execution.
    KafkaReceiver.create(receiverOptions)
                 .receive()
                 .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                 .groupBy(m -> m.receiverOffset().topicPartition())
                 .flatMap(partitionFlux ->
                     partitionFlux.publishOn(scheduler)
                         .map(r -> processRecord(partitionFlux.key(), r))
                         .sample(Duration.ofMillis(5000))
                         .concatMap(offset -> offset.commit()));
}

响应式文档在 nothing happens until you subscribe 的入门部分对此进行了介绍。在上面的代码中,您正在创建一个反应流,但是没有人订阅它。

由于您的应用程序是流的使用者,因此您应该在某处添加 subscribe 语句。

我个人不会 return void(你通常会尽量避免在反应式编程中使用 void 函数,因为它们通常会产生副作用并且很难测试),我会 return producer 一直到主函数,以便对代码进行单元测试。

这样生成的主函数看起来像这样。

public static void main(String[] args) throws Exception {
    String sourceTopic = "metrics";
    String destinationTopic = "cleanmetrics";

    RunReactiveTranformProcess(sourceTopic, destinationTopic).subscribe();

}