Kafka:使用手动批处理使用分区 - 消息被跳过

Kafka: Consume partition with manual batching - Messages are being skipped

我正在使用 Confluent Kafka .NET 为分区主题创建消费者。

由于 Confluent Kafka .NET 不支持批量消费,我构建了一个消费消息的函数,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用了具有不同分区的结果并且 return 我能够使用的任何数量的消息,我就停止构建批处理到目前为止。

目标或 Objective: 我希望能够处理我在批处理中 return 编辑的消息,并提交这些消息的偏移量只要。即:

Message Consumed From Partition Offset Stored in Batch
0 0 Yes
0 1 Yes
2 0 No

根据上面的 table,我想处理从分区 0 收到的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对 ConsumeBatch 的另一个调用中获取.

为了提交,我只需调用同步 Commit 函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上面 table 中显示的批处理的第二条消息的偏移量(分区 0 - 偏移量 1)。

问题:

问题是,由于某种原因,当我构建如上所示的批处理时,由于验证而决定不处理的消息将被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。

正如您在下面的消费者配置中看到的,我已将 EnableAutoCommitEnableAutoOffsetStore 都设置为 false。我认为这足以让消费者不对偏移量做任何事情并且能够在另一个 Consume 调用中获取被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。

任何人都可以告诉我我在这里缺少什么来实现所需的行为吗?

建批功能的简化版:

public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
    List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();

    int latestPartition = -1; // The partition from where we consumed the last message

    for (int i = 0; i < batchSize; i++)
    {
        var result = _consumer.Consume(100);
        
        if (result != null)
        {
            if (latestPartition == -1 || result.Partition.Value == latestPartition)
            {
                consumedMessages.Add(result);
                latestPartition = result.Partition.Value;
            }
            else
                break;
        }
        else
            break;
    }

    return consumedMessages;
}

ConsumerConfig 用于实例化我的消费者客户端:

_consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _bootstrapServers,
            EnableAutoCommit = false,
            AutoCommitIntervalMs = 0,
            GroupId = "WorkerConsumers",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoOffsetStore = false,
        };

附加信息: 正在测试:

关键是使用 Seek 函数将分区的偏移量重置为特定位置,以便可以将被忽略的消息作为另一批次的一部分再次拾取。

在上面的相同函数中:

public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
    List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();

    int latestPartition = -1; // The partition from where we consumed the last message

    for (int i = 0; i < batchSize; i++)
    {
        var result = _consumer.Consume(100);
    
        if (result != null)
        {
            if (latestPartition == -1 || result.Partition.Value == latestPartition)
            {
                consumedMessages.Add(result);
                latestPartition = result.Partition.Value;
            }
            else
            {
                // This call will guarantee that this message that will not be included in the current batch, will be included in another batch later
                _consumer.Seek(result.TopicPartitionOffset); // IMPORTANT LINE!!!!!!!
                break;
            }
        }
        else
            break;
    }

    return consumedMessages;
}

我想一般来说,如果你想在不以任何方式改变偏移量的情况下使用消息(有点偷看主题分区),你可以调用 Consume 然后使用 Seek(result.TopicPartitionOffset) 来设置该主题分区的偏移量回到使用消息之前的位置。