Redis 使用 Java 为每个消费者流式传输一条消息

Redis Streams one message per consumer with Java

我正在尝试使用 redis 流实现一个 java 应用程序,其中每个消费者只消费一条消息。就像 pipeline/queue 一样,每个消费者只接受一条消息,对其进行处理,并在完成后消费者接受流中到目前为止尚未处理的下一条消息。 有效的是每条消息都被一个消费者消费(使用 xreadgroup)。

我从 this tutorial from redislabs

开始

代码:

    RedisClient redisClient = RedisClient.create("redis://pw@host:port");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisCommands<String, String> syncCommands = connection.sync();

    try {
        syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
    } catch (RedisBusyException redisBusyException) {
        System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
    }

    System.out.println("Waiting for new messages ");

    while (true) {
        List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
                Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

        if (!messages.isEmpty()) {
            System.out.println(messages.size()); // 
            for (StreamMessage<String, String> message : messages) {
                System.out.println(message.getId());
                Thread.sleep(5000);
                syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
            }
        }

    }

我目前的问题是,一个消费者从队列中获取的消息多于一条,在某些情况下,其他消费者正在等待,而一个消费者一次处理 10 条消息。

提前致谢!

注意 XREADGROUP 可以获得 COUNT 个参数。

通过传递 XReadArgs,在 Lettuce xreadgroup 中查看 JavaDoc 如何做到这一点。