RabbitMQ 流
RabbitMQ Streams
使用此文档作为参考:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application我在 RabbitMQ 中创建了一个流并向其中添加了 100 万条消息。
try (Environment environment = Environment.builder().uri("rabbitmq-stream://guest:guest@localhost:5552")
.build()) {
environment.streamCreator().stream("tenth-application-stream").create();
Producer producer = environment.producerBuilder().stream("tenth-application-stream")
.build();
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder().properties().creationTime(System.currentTimeMillis())
.messageId(i).messageBuilder().properties().groupId(String.valueOf(i)).messageBuilder()
.addData(String.valueOf(i).getBytes(StandardCharsets.UTF_8)).build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
try {
boolean done = confirmLatch.await(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (StreamException e) {
System.out.println(e);
}
我正在尝试通过执行以下操作来回读流中的所有消息:
Consumer consumer = environment.consumerBuilder().stream("tenth-application-stream")
.offset(OffsetSpecification.first()).messageHandler((context, message) -> {
System.out.println(message.getBody().toString());
}).build();
但是,这最多只能打印消息 499。如何查看流中的所有消息?
由于您的客户端已经阅读了 499 条消息,我们知道有些消息是由生产者发布的。您是否已验证队列中实际上有超过 499 条消息?这可以使用 RabbitMQ Web 管理器来完成。 Steam 队列的一个很好的特性是消息可以在消费者阅读后保留在队列中。
如果发布了其余的消息,那么很可能消费者在所有消息被消费之前就关闭了它的连接。博客(repo here)中的例子使用Utils.waitAtMost方法延迟关闭连接:
Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);
这会等到 60 秒过去或 1M 消息被消耗。或者,您可以使用 CountDownLatch 方法保持连接打开,直到达到目标。
这些选项似乎都不适合生产用例,但它们很适合概念验证。
使用此文档作为参考:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application我在 RabbitMQ 中创建了一个流并向其中添加了 100 万条消息。
try (Environment environment = Environment.builder().uri("rabbitmq-stream://guest:guest@localhost:5552")
.build()) {
environment.streamCreator().stream("tenth-application-stream").create();
Producer producer = environment.producerBuilder().stream("tenth-application-stream")
.build();
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder().properties().creationTime(System.currentTimeMillis())
.messageId(i).messageBuilder().properties().groupId(String.valueOf(i)).messageBuilder()
.addData(String.valueOf(i).getBytes(StandardCharsets.UTF_8)).build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
try {
boolean done = confirmLatch.await(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (StreamException e) {
System.out.println(e);
}
我正在尝试通过执行以下操作来回读流中的所有消息:
Consumer consumer = environment.consumerBuilder().stream("tenth-application-stream")
.offset(OffsetSpecification.first()).messageHandler((context, message) -> {
System.out.println(message.getBody().toString());
}).build();
但是,这最多只能打印消息 499。如何查看流中的所有消息?
由于您的客户端已经阅读了 499 条消息,我们知道有些消息是由生产者发布的。您是否已验证队列中实际上有超过 499 条消息?这可以使用 RabbitMQ Web 管理器来完成。 Steam 队列的一个很好的特性是消息可以在消费者阅读后保留在队列中。
如果发布了其余的消息,那么很可能消费者在所有消息被消费之前就关闭了它的连接。博客(repo here)中的例子使用Utils.waitAtMost方法延迟关闭连接:
Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);
这会等到 60 秒过去或 1M 消息被消耗。或者,您可以使用 CountDownLatch 方法保持连接打开,直到达到目标。
这些选项似乎都不适合生产用例,但它们很适合概念验证。