具有异步的 Kafka 生产者不返回 DeliveryReport 但返回 DeliveryResult

Kafka producer with Async not returning DeliveryReport but DeliveryResult

我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用 produce 它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用 ProduceAsync 然后 return 类型是 deliveryResult 我如何获取 DeliveryReport 并记录失败的原因

使用产品:

    public void WriteMessage(string message)
    {
        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {
            producer.Produce(this._topicName, new Message<string, string>()
            {
                Key = rand.Next(5).ToString(),
                Value = message
            },
            (deliveryReport) =>
            {
                if (deliveryReport.Error.Code != ErrorCode.NoError)
                {
                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                }
                else
                {
                    Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
                }
            });

            producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport 并可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,下面是元数据:

namespace Confluent.Kafka
{
    //
    // Summary:
    //     The result of a produce request.
    public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
    {
        public DeliveryReport();

        //
        // Summary:
        //     An error (or NoError) associated with the message.
        public Error Error { get; set; }
        //
        // Summary:
        //     The TopicPartitionOffsetError associated with the message.
        public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
    }
}

使用 ProduceAsync

    public async Task WriteAysncMessage(string message)
    {
        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {

            var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
            {
                Key = rand.Next(5).ToString(),
                Value = message
            });

            
            producer.Flush(TimeSpan.FromSeconds(60));
        }
    }

在使用 ProducerAsync 的上述方法中,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它是 returning DeliveryResult 但不是 DeliveryReport

此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 哪个好。

我想我找到了解决方案:

        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {
            try
            {
                var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
                {
                    Key = rand.Next(5).ToString(),
                    Value = message
                });

                Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
            }
            catch (ProduceException<string, string> e)
            {
                Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
            }
            producer.Flush(TimeSpan.FromSeconds(60));
        }