具有异步的 Kafka 生产者不返回 DeliveryReport 但返回 DeliveryResult
Kafka producer with Async not returning DeliveryReport but DeliveryResult
我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用 produce 它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用 ProduceAsync 然后 return 类型是 deliveryResult 我如何获取 DeliveryReport 并记录失败的原因
使用产品:
public void WriteMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
producer.Produce(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
},
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
}
在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport 并可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,下面是元数据:
namespace Confluent.Kafka
{
//
// Summary:
// The result of a produce request.
public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
{
public DeliveryReport();
//
// Summary:
// An error (or NoError) associated with the message.
public Error Error { get; set; }
//
// Summary:
// The TopicPartitionOffsetError associated with the message.
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
}
}
使用 ProduceAsync
public async Task WriteAysncMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
producer.Flush(TimeSpan.FromSeconds(60));
}
}
在使用 ProducerAsync 的上述方法中,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它是 returning DeliveryResult 但不是 DeliveryReport
此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 哪个好。
我想我找到了解决方案:
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
try
{
var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
}
producer.Flush(TimeSpan.FromSeconds(60));
}
我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用 produce 它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用 ProduceAsync 然后 return 类型是 deliveryResult 我如何获取 DeliveryReport 并记录失败的原因
使用产品:
public void WriteMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
producer.Produce(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
},
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
}
在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport 并可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,下面是元数据:
namespace Confluent.Kafka
{
//
// Summary:
// The result of a produce request.
public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
{
public DeliveryReport();
//
// Summary:
// An error (or NoError) associated with the message.
public Error Error { get; set; }
//
// Summary:
// The TopicPartitionOffsetError associated with the message.
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
}
}
使用 ProduceAsync
public async Task WriteAysncMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
producer.Flush(TimeSpan.FromSeconds(60));
}
}
在使用 ProducerAsync 的上述方法中,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它是 returning DeliveryResult 但不是 DeliveryReport
此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 哪个好。
我想我找到了解决方案:
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
try
{
var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
}
producer.Flush(TimeSpan.FromSeconds(60));
}