Kafka消费者提交线程安全
Kafka consumer commit thread-safety
我正在使用 Confluent.Kafka dotnet 客户端。
namespace Confluent.Kafka
{
public class Consumer<TKey, TValue> : IDisposable
{
public Task<CommittedOffsets> CommitAsync();
}
}
如您所见,Consumer.CommitAsync 是一个异步方法。在不等待其响应的情况下调用 CommitAsync
方法然后对 Subscribe
进行下一次调用是否安全?
示例代码如下。
using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer()))
{
consumer.Subscribe(topics);
while (true)
{
Message<MessageKey, byte[]> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
{
// ...
if( msg.Offset % 100 == 0)
{
consumer.CommitAsync().ContinueWith((t) =>
{
// log t.Exception
}, TaskContinuationOptions.OnlyOnFaulted);
}
}
}
}
我假设您想说下一次调用 Consume
是的,很安全,没问题。
我还会为提交添加一些时间 window(例如在 5s 和 100msgs 之间最先出现的时间),这样如果您有一段时间没有收到消息,您仍然会提交它们
我正在使用 Confluent.Kafka dotnet 客户端。
namespace Confluent.Kafka
{
public class Consumer<TKey, TValue> : IDisposable
{
public Task<CommittedOffsets> CommitAsync();
}
}
如您所见,Consumer.CommitAsync 是一个异步方法。在不等待其响应的情况下调用 CommitAsync
方法然后对 Subscribe
进行下一次调用是否安全?
示例代码如下。
using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer()))
{
consumer.Subscribe(topics);
while (true)
{
Message<MessageKey, byte[]> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
{
// ...
if( msg.Offset % 100 == 0)
{
consumer.CommitAsync().ContinueWith((t) =>
{
// log t.Exception
}, TaskContinuationOptions.OnlyOnFaulted);
}
}
}
}
我假设您想说下一次调用 Consume
是的,很安全,没问题。 我还会为提交添加一些时间 window(例如在 5s 和 100msgs 之间最先出现的时间),这样如果您有一段时间没有收到消息,您仍然会提交它们