如何在 .Net 中使用 Confluent.Kafka 从特定的 TopicPartitionOffset 消费
How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net
我需要我的消费者从特定的 TopicPartitionOffset(here from offset 278)
消费。假设 Messages 已经由某个 Producer 在 Specific topic 之前产生,例如 ="Test_1"
。
这是我的代码
using System;
using Confluent.Kafka;
public class ConsumingTest
{
public static void Main(string[] args)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
};
using (var consumer = new Consumer<Null, string>(consumerConfig))
{
Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
consumer.Subscribe("Test_1");
var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));
consumer.Assign(topicPartitionOffset);
consumer.Seek(topicPartitionOffset);
while (true)
try
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine(e.Message);
}
}
}
}
在行 ----> var cr = consumer.Consume();
消费者消费但没有任何反应。什么问题。
我已经在 ConsumerConfig 中完成了 AutoOffsetReset = AutoOffsetResetType.Earliest
,Consumer Consumes All messages from all offsets 但是,这不是我想要的。
已解决:我找到了如下所述的解决方案:
- 添加了这个
consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset)))
在尝试消费之前,
- 删除了这些
consumer.Subscribe("Test_1")
和 consumer.Seek(...)
所以更新后的代码是这样的,可以完美运行:
using (var consumer = new Consumer<Ignore, string>(config))
{
consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error}");
}
}
consumer.Close();
}
我需要我的消费者从特定的 TopicPartitionOffset(here from offset 278)
消费。假设 Messages 已经由某个 Producer 在 Specific topic 之前产生,例如 ="Test_1"
。
这是我的代码
using System;
using Confluent.Kafka;
public class ConsumingTest
{
public static void Main(string[] args)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
};
using (var consumer = new Consumer<Null, string>(consumerConfig))
{
Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
consumer.Subscribe("Test_1");
var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));
consumer.Assign(topicPartitionOffset);
consumer.Seek(topicPartitionOffset);
while (true)
try
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine(e.Message);
}
}
}
}
在行 ----> var cr = consumer.Consume();
消费者消费但没有任何反应。什么问题。
我已经在 ConsumerConfig 中完成了 AutoOffsetReset = AutoOffsetResetType.Earliest
,Consumer Consumes All messages from all offsets 但是,这不是我想要的。
已解决:我找到了如下所述的解决方案:
- 添加了这个
consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset)))
在尝试消费之前,
- 删除了这些
consumer.Subscribe("Test_1")
和 consumer.Seek(...)
所以更新后的代码是这样的,可以完美运行:
using (var consumer = new Consumer<Ignore, string>(config))
{
consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error}");
}
}
consumer.Close();
}