如何在 .Net 中使用 Confluent.Kafka 从特定的 TopicPartitionOffset 消费

How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net

我需要我的消费者从特定的 TopicPartitionOffset(here from offset 278) 消费。假设 Messages 已经由某个 Producer 在 Specific topic 之前产生,例如 ="Test_1"。 这是我的代码

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                }
        }
    }
}

在行 ----> var cr = consumer.Consume(); 消费者消费但没有任何反应。什么问题。

我已经在 ConsumerConfig 中完成了 AutoOffsetReset = AutoOffsetResetType.Earliest,Consumer Consumes All messages from all offsets 但是,这不是我想要的。

已解决:我找到了如下所述的解决方案:

  • 添加了这个

consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset))) 在尝试消费之前,

  • 删除了这些

consumer.Subscribe("Test_1")consumer.Seek(...)

所以更新后的代码是这样的,可以完美运行:

using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume();
                        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error}");
                    }
                }

                consumer.Close();
            }