Kafka 消费者丢失消息
Kafka consumer missing messages
在 2 个分区上有一个名为“WebMessages”的 Kafka 主题。
我们在同一台服务器上有两个使用者组,但在 IIS 上的不同站点上。
其中一个消费者无法接收消息。另一个错过了大部分消息。
我在本地电脑上写simple consumer的时候,也遗漏了一些消息。知道出了什么问题吗?
生产者代码如下:
_producerConfig = new ProducerConfig {
BootstrapServers = _addressWithPort,
Acks = Acks.All
};
using (var p = new ProducerBuilder<string, string>(_producerConfig).Build())
{
p.ProduceAsync(_topicName, new Message<string, string>
{
Key = ldtoKafkaMessage.Key,
Value = ldtoKafkaMessage.Message
}).ContinueWith(task =>
{
if (task.IsFaulted)
{
TraceController.TraceError(Common.Enums.TraceEventCategories.X, "Key|Message", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + task.Exception.Message + " " + task.Exception.InnerException+ " " + task.Exception.StackTrace);
}
else
{
TraceController.TraceInformation(Common.Enums.TraceEventCategories.X, "Key|Message|Result", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + "Success");
}
});
}
所以我确保我向生产者发送了消息。
这是消费者代码。
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _addressWithPort,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = consumerGroupId
};
using (var c = new ConsumerBuilder<string, string>(_consumerConfig).Build())
{
c.Subscribe(_topicName);
CancellationTokenSource cts = new CancellationTokenSource();
try
{
while (!cts.IsCancellationRequested)
{
try
{
var cr = c.Consume(cts.Token);
LdtoKafkaMessage.Key = cr.Key;
LdtoKafkaMessage.Message = cr.Value;
this.OnMessageChanged();
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
您的某些消息可能不会生成,因为您处理了生成器并且没有等待它完成。您可以通过在 ProduceAsync
方法上使用 await
关键字或在处置生产者之前调用 Flush()
来确保传递消息。
在 2 个分区上有一个名为“WebMessages”的 Kafka 主题。
我们在同一台服务器上有两个使用者组,但在 IIS 上的不同站点上。
其中一个消费者无法接收消息。另一个错过了大部分消息。
我在本地电脑上写simple consumer的时候,也遗漏了一些消息。知道出了什么问题吗?
生产者代码如下:
_producerConfig = new ProducerConfig {
BootstrapServers = _addressWithPort,
Acks = Acks.All
};
using (var p = new ProducerBuilder<string, string>(_producerConfig).Build())
{
p.ProduceAsync(_topicName, new Message<string, string>
{
Key = ldtoKafkaMessage.Key,
Value = ldtoKafkaMessage.Message
}).ContinueWith(task =>
{
if (task.IsFaulted)
{
TraceController.TraceError(Common.Enums.TraceEventCategories.X, "Key|Message", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + task.Exception.Message + " " + task.Exception.InnerException+ " " + task.Exception.StackTrace);
}
else
{
TraceController.TraceInformation(Common.Enums.TraceEventCategories.X, "Key|Message|Result", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + "Success");
}
});
}
所以我确保我向生产者发送了消息。
这是消费者代码。
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _addressWithPort,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = consumerGroupId
};
using (var c = new ConsumerBuilder<string, string>(_consumerConfig).Build())
{
c.Subscribe(_topicName);
CancellationTokenSource cts = new CancellationTokenSource();
try
{
while (!cts.IsCancellationRequested)
{
try
{
var cr = c.Consume(cts.Token);
LdtoKafkaMessage.Key = cr.Key;
LdtoKafkaMessage.Message = cr.Value;
this.OnMessageChanged();
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
您的某些消息可能不会生成,因为您处理了生成器并且没有等待它完成。您可以通过在 ProduceAsync
方法上使用 await
关键字或在处置生产者之前调用 Flush()
来确保传递消息。