合流的批量消费者。如果指定超时,消费者不工作
Confluent Batch Consumer. Consumer not working if Time out is specified
我正在尝试一次最多从 kafka 接收 1000 条消息。 (我这样做是因为我需要批量插入到 MSSQL 中。)我的印象是 kafka 保留了一个内部队列,它从代理那里获取消息,当我使用 consumer.consume() 方法时,它只是检查是否有是内部队列中的任何消息和 returns 如果它找到了什么。否则它只会阻塞直到内部队列更新或超时。
我尝试使用此处建议的解决方案:https://github.com/confluentinc/confluent-kafka-dotnet/issues/1164#issuecomment-610308425
但是当我指定 TimeSpan.Zero(或任何其他时间跨度不超过 1000 毫秒)时,消费者从不消费任何消息。但如果我删除超时,它确实会消耗消息,但如果没有更多消息可供读取,我将无法退出循环。
我还在 Whosebug 上看到了另一个问题,它建议读取发送到 kafka 的最后一条消息的偏移量,然后读取消息直到达到该偏移量,然后从循环中跳出。但目前我只有一个消费者和一个主题的 6 个分区。我还没有尝试过,但我认为管理每个分区的偏移量可能会使代码变得混乱。
有人可以告诉我该怎么做吗?
static List<RealTime> getBatch()
{
var config = new ConsumerConfig
{
BootstrapServers = ConfigurationManager.AppSettings["BootstrapServers"],
GroupId = ConfigurationManager.AppSettings["ConsumerGroupID"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
List<RealTime> results = new List<RealTime>();
List<string> malformedJson = new List<string>();
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("RealTimeTopic");
int count = 0;
while (count < batchSize)
{
var consumerResult = consumer.Consume(1000);
if (consumerResult?.Message is null)
{
break;
}
Console.WriteLine("read");
try
{
RealTime item = JsonSerializer.Deserialize<RealTime>(consumerResult.Message.Value);
results.Add(item);
count += 1;
}
catch(Exception e)
{
Console.WriteLine("malformed");
malformedJson.Add(consumerResult.Message.Value);
}
}
consumer.Close();
};
Console.WriteLine(malformedJson.Count);
return results;
}
我找到了解决方法。
由于某种原因,消费者首先需要在没有超时的情况下被调用。这意味着它将等待一条消息,直到它至少收到一条消息。之后使用 consume with timeout zero 从内部队列中一条一条地获取所有剩余的消息。这似乎是最好的。
我正在尝试一次最多从 kafka 接收 1000 条消息。 (我这样做是因为我需要批量插入到 MSSQL 中。)我的印象是 kafka 保留了一个内部队列,它从代理那里获取消息,当我使用 consumer.consume() 方法时,它只是检查是否有是内部队列中的任何消息和 returns 如果它找到了什么。否则它只会阻塞直到内部队列更新或超时。
我尝试使用此处建议的解决方案:https://github.com/confluentinc/confluent-kafka-dotnet/issues/1164#issuecomment-610308425
但是当我指定 TimeSpan.Zero(或任何其他时间跨度不超过 1000 毫秒)时,消费者从不消费任何消息。但如果我删除超时,它确实会消耗消息,但如果没有更多消息可供读取,我将无法退出循环。
我还在 Whosebug 上看到了另一个问题,它建议读取发送到 kafka 的最后一条消息的偏移量,然后读取消息直到达到该偏移量,然后从循环中跳出。但目前我只有一个消费者和一个主题的 6 个分区。我还没有尝试过,但我认为管理每个分区的偏移量可能会使代码变得混乱。
有人可以告诉我该怎么做吗?
static List<RealTime> getBatch()
{
var config = new ConsumerConfig
{
BootstrapServers = ConfigurationManager.AppSettings["BootstrapServers"],
GroupId = ConfigurationManager.AppSettings["ConsumerGroupID"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
List<RealTime> results = new List<RealTime>();
List<string> malformedJson = new List<string>();
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("RealTimeTopic");
int count = 0;
while (count < batchSize)
{
var consumerResult = consumer.Consume(1000);
if (consumerResult?.Message is null)
{
break;
}
Console.WriteLine("read");
try
{
RealTime item = JsonSerializer.Deserialize<RealTime>(consumerResult.Message.Value);
results.Add(item);
count += 1;
}
catch(Exception e)
{
Console.WriteLine("malformed");
malformedJson.Add(consumerResult.Message.Value);
}
}
consumer.Close();
};
Console.WriteLine(malformedJson.Count);
return results;
}
我找到了解决方法。 由于某种原因,消费者首先需要在没有超时的情况下被调用。这意味着它将等待一条消息,直到它至少收到一条消息。之后使用 consume with timeout zero 从内部队列中一条一条地获取所有剩余的消息。这似乎是最好的。