RabbitMQ 流

RabbitMQ Streams

使用此文档作为参考:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application我在 RabbitMQ 中创建了一个流并向其中添加了 100 万条消息。

try (Environment environment = Environment.builder().uri("rabbitmq-stream://guest:guest@localhost:5552")
                .build()) {

            environment.streamCreator().stream("tenth-application-stream").create();
            Producer producer = environment.producerBuilder().stream("tenth-application-stream") 
                    .build();

        int messageCount = 1_000_000;

        CountDownLatch confirmLatch = new CountDownLatch(messageCount);
        IntStream.range(0, messageCount).forEach(i -> {
            Message message = producer.messageBuilder().properties().creationTime(System.currentTimeMillis())
                    .messageId(i).messageBuilder().properties().groupId(String.valueOf(i)).messageBuilder()
                    .addData(String.valueOf(i).getBytes(StandardCharsets.UTF_8)).build();
            producer.send(message, confirmationStatus -> confirmLatch.countDown());
        });
        try {
            boolean done = confirmLatch.await(10, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } catch (StreamException e) {
        System.out.println(e);
    }

我正在尝试通过执行以下操作来回读流中的所有消息:

    Consumer consumer = environment.consumerBuilder().stream("tenth-application-stream")
            .offset(OffsetSpecification.first()).messageHandler((context, message) -> {
                System.out.println(message.getBody().toString());

            }).build();

但是,这最多只能打印消息 499。如何查看流中的所有消息?

由于您的客户端已经阅读了 499 条消息,我们知道有些消息是由生产者发布的。您是否已验证队列中实际上有超过 499 条消息?这可以使用 RabbitMQ Web 管理器来完成。 Steam 队列的一个很好的特性是消息可以在消费者阅读后保留在队列中。

如果发布了其余的消息,那么很可能消费者在所有消息被消费之前就关闭了它的连接。博客(repo here)中的例子使用Utils.waitAtMost方法延迟关闭连接:

Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);

这会等到 60 秒过去或 1M 消息被消耗。或者,您可以使用 CountDownLatch 方法保持连接打开,直到达到目标。

这些选项似乎都不适合生产用例,但它们很适合概念验证。