合流的批量消费者。如果指定超时,消费者不工作

Confluent Batch Consumer. Consumer not working if Time out is specified

我正在尝试一次最多从 kafka 接收 1000 条消息。 (我这样做是因为我需要批量插入到 MSSQL 中。)我的印象是 kafka 保留了一个内部队列,它从代理那里获取消息,当我使用 consumer.consume() 方法时,它只是检查是否有是内部队列中的任何消息和 returns 如果它找到了什么。否则它只会阻塞直到内部队列更新或超时。

我尝试使用此处建议的解决方案:https://github.com/confluentinc/confluent-kafka-dotnet/issues/1164#issuecomment-610308425

但是当我指定 TimeSpan.Zero(或任何其他时间跨度不超过 1000 毫秒)时,消费者从不消费任何消息。但如果我删除超时,它确实会消耗消息,但如果没有更多消息可供读取,我将无法退出循环。

我还在 Whosebug 上看到了另一个问题,它建议读取发送到 kafka 的最后一条消息的偏移量,然后读取消息直到达到该偏移量,然后从循环中跳出。但目前我只有一个消费者和一个主题的 6 个分区。我还没有尝试过,但我认为管理每个分区的偏移量可能会使代码变得混乱。

有人可以告诉我该怎么做吗?

static List<RealTime> getBatch()
    {

        var config = new ConsumerConfig
        {
            BootstrapServers = ConfigurationManager.AppSettings["BootstrapServers"],
            GroupId = ConfigurationManager.AppSettings["ConsumerGroupID"],
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };

        List<RealTime> results = new List<RealTime>();
        List<string> malformedJson = new List<string>();

        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("RealTimeTopic");
            int count = 0;

            while (count < batchSize)
            {
                var consumerResult = consumer.Consume(1000);
                
                if (consumerResult?.Message is null)
                {
                    break;
                }
                Console.WriteLine("read");
                try
                {
                    RealTime item = JsonSerializer.Deserialize<RealTime>(consumerResult.Message.Value);
                    results.Add(item);
                    count += 1;
                }
                catch(Exception e)
                {
                    Console.WriteLine("malformed");
                    malformedJson.Add(consumerResult.Message.Value);
                }
                
                
            }
            
            consumer.Close();

        };
        Console.WriteLine(malformedJson.Count);

        return results;
    }

我找到了解决方法。 由于某种原因,消费者首先需要在没有超时的情况下被调用。这意味着它将等待一条消息,直到它至少收到一条消息。之后使用 consume with timeout zero 从内部队列中一条一条地获取所有剩余的消息。这似乎是最好的。