消息在 Confluent Kafka Dotnet 中丢失
Messages getting lost in Confluent Kafka Dotnet
我在最近的 c# 项目中使用了 Confluent kafka 包。我通过以下方式创建了一个生产者:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
但问题是我的一些消息没有送达消费者。他们在某个地方迷路了。但是,如果我将 await 与生产者一起使用,那么所有消息都会被传递。如何在不等待的情况下传递我的所有消息。 (我只有一个分区)
首先,您应该只使用一个 Producer
来发送您的 msgList
,因为为每条消息创建一个新的 Producer
非常昂贵。
您可以做的是,将 Produce()
方法与 Flush()
结合使用。使用 Produce()
您将异步发送消息而无需等待响应。然后调用 Flush()
将阻塞,直到传递所有 in-flight 消息。
var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();
foreach (var msg in msglist)
{
producer.Produce(topic, new Message<Null, string> { Value = msg });
}
producer.Flush();
如果没有 await
或 Flush()
,您可能会丢失消息,因为您的生产者可能会在所有消息传递之前被处理掉。
我在最近的 c# 项目中使用了 Confluent kafka 包。我通过以下方式创建了一个生产者:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
但问题是我的一些消息没有送达消费者。他们在某个地方迷路了。但是,如果我将 await 与生产者一起使用,那么所有消息都会被传递。如何在不等待的情况下传递我的所有消息。 (我只有一个分区)
首先,您应该只使用一个 Producer
来发送您的 msgList
,因为为每条消息创建一个新的 Producer
非常昂贵。
您可以做的是,将 Produce()
方法与 Flush()
结合使用。使用 Produce()
您将异步发送消息而无需等待响应。然后调用 Flush()
将阻塞,直到传递所有 in-flight 消息。
var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();
foreach (var msg in msglist)
{
producer.Produce(topic, new Message<Null, string> { Value = msg });
}
producer.Flush();
如果没有 await
或 Flush()
,您可能会丢失消息,因为您的生产者可能会在所有消息传递之前被处理掉。