使用 Assign() 时,Kafka .net 客户端没有收到任何消息
Kafka .net client does not receive any messages when using Assign()
我正在尝试让我的服务在开始时从头到尾重新读取 kafka 主题以初始化内部数据结构。我正在使用 Confluent .NET 客户端。
根据我的理解,以下代码应该订阅我的主题设置偏移量到开头:
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
但由于某种原因,我没有收到主题中已有的消息,也没有收到新消息。
我对 Assign() 方法的理解有误吗?有没有一种方法可以使用 Subscribe() 实现预期的结果,而无需使用 kafka CLI 硬重置偏移量?
这是完整的测试客户端,尽管主题有消息并且新消息正在到达,但我的输出始终是 'No messages...'。
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
var topic = "test-topic";
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result == null)
Console.WriteLine("No messages...");
else
Console.WriteLine($"Offset: {result.Offset}");
}
}
为什么要使用 Assign
?以下应该适合您:
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("test-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
问题是我使用 Partition.Any 调用了 Assign(),以下代码有效:
consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));
我正在尝试让我的服务在开始时从头到尾重新读取 kafka 主题以初始化内部数据结构。我正在使用 Confluent .NET 客户端。 根据我的理解,以下代码应该订阅我的主题设置偏移量到开头:
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
但由于某种原因,我没有收到主题中已有的消息,也没有收到新消息。 我对 Assign() 方法的理解有误吗?有没有一种方法可以使用 Subscribe() 实现预期的结果,而无需使用 kafka CLI 硬重置偏移量?
这是完整的测试客户端,尽管主题有消息并且新消息正在到达,但我的输出始终是 'No messages...'。
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
var topic = "test-topic";
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result == null)
Console.WriteLine("No messages...");
else
Console.WriteLine($"Offset: {result.Offset}");
}
}
为什么要使用 Assign
?以下应该适合您:
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("test-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
问题是我使用 Partition.Any 调用了 Assign(),以下代码有效:
consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));