使用 Assign() 时,Kafka .net 客户端没有收到任何消息

Kafka .net client does not receive any messages when using Assign()

我正在尝试让我的服务在开始时从头到尾重新读取 kafka 主题以初始化内部数据结构。我正在使用 Confluent .NET 客户端。 根据我的理解,以下代码应该订阅我的主题设置偏移量到开头:

consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));

但由于某种原因,我没有收到主题中已有的消息,也没有收到新消息。 我对 Assign() 方法的理解有误吗?有没有一种方法可以使用 Subscribe() 实现预期的结果,而无需使用 kafka CLI 硬重置偏移量?

这是完整的测试客户端,尽管主题有消息并且新消息正在到达,但我的输出始终是 'No messages...'。

    static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "test-consumer",
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };
        var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
        var topic = "test-topic";
        consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
        while (true)
        {
            var result = consumer.Consume(TimeSpan.FromSeconds(5));
            if (result == null)
                Console.WriteLine("No messages...");
            else
                Console.WriteLine($"Offset: {result.Offset}");
        }
    }

为什么要使用 Assign?以下应该适合您:

public static void Main(string[] args)
{
    var conf = new ConsumerConfig
    { 
        GroupId = "test-consumer",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
    {
        c.Subscribe("test-topic");

        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) => {
            e.Cancel = true; // prevent the process from terminating.
            cts.Cancel();
        };

        try
        {
            while (true)
            {
                try
                {
                    var cr = c.Consume(cts.Token);
                    Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            c.Close();
        }
    }
}

问题是我使用 Partition.Any 调用了 Assign(),以下代码有效:
consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));