无法从 Azure Function 发送到 Kafka 主题(Confluent Cloud)

Unable to send to Kafka topic (Confluent Cloud) from Azure Function

我的问题:

代码没有通过 ProduceAsync() 调用,也没有抛出任何异常。 Kafka 中没有关于该主题的消息出现。该功能不会失败,它只是永远不会完成,这意味着它甚至不会将状态记录到 Azure 门户从中获取它显示在 "Monitoring" 下的数据的任何地方。它不会因超过函数可以 运行 的最大时间量而触发任何异常。但是,该函数已执行,因为 ApplicationInsights 在其跟踪消息中显示了我的调试日志语句。

我尝试了几种不同的方法,但这是最新方法的代码(已更改名称以保护无辜者):

[FunctionName("MyFunction")]
public static async Task RunAsync(
    [TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
    ILogger log,
    ExecutionContext context,
    [HttpClientFactory]HttpClient httpClient
)
{
    try {
        // ...
        using (var kafkaProducer = initKafkaProducer(config))
        {
            var myHelper = new MyHelper(/*...*/, kafkaProducer);
            foreach (var obj in objects)
            {
                await myHelper.ProcessObject(obj);
            }
        }
    }
    catch(Exception ex) 
    {
        //...
        throw new Exception("My error message", ex);
    }
}

private static IProducer<Null, string> initKafkaProducer(IConfigurationRoot config)
{
    var pConfig = new ProducerConfig
    {
        BootstrapServers = config["BootstrapServers"],
        SaslMechanism = SaslMechanism.Plain,
        SecurityProtocol = SecurityProtocol.SaslSsl,
        SaslUsername = config["Username"],
        SaslPassword = config["Password"],
        MessageSendMaxRetries = 10,
        RetryBackoffMs = 250,
        Acks = Acks.All,
        LingerMs = 5
    };

    return new ProducerBuilder<Null, string>(pConfig).Build();
}


public class MyHelper
{
    public async Task ProcessObject(MyObject obj)
    {
        try
        {
            //...
            await sendToKafkaTopic(obj);
            _log.LogInformation($"DEBUG: Successfully sent to Kafka");
            //...
        }
        catch (HttpRequestException ex)
        {
            throw new Exception("My error message...", ex);
        }
    }

    private async Task sendToKafkaTopic(MyObject obj)
    {
        string topic = _config["KafkaTopic"];
        var message = new Message<Null, string> { Value = JsonConvert.SerializeObject(obj) };

        try
        {
            var deliveryResult = await _kafkaProducer.ProduceAsync(topic, message);
            _log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
        }
        catch (ProduceException<Null, string> e)
        {
            var error = e.Error;

            if (error.IsError && error.IsFatal)
            {
                string errorMessage = "FATAL Kafka error! ";
                if (error.IsBrokerError)
                {
                    errorMessage += "BrokerError. ";
                }
                else if (error.IsLocalError)
                {
                    errorMessage += "LocalError. ";
                }

                errorMessage += $"Error code: {error.Code}. Reason: {error.Reason}";
                throw new Exception(errorMessage);
            }
        }
    }
}

我尝试过的另一种方法是使用 Produce() 调用,使用它发送 DeliveryReport 的处理程序,并使用 _kafkaProducer.Flush(TimeSpan.FromSeconds(10)) 调用。这使它看起来一切顺利,但消息没有显示在 Kafka 中。

我也尝试过使用 ProduceAsync(topic, message).ContinueWith((t) => if(t.IsFaulted)...) 方法,这似乎工作了一段时间,但在随机调用函数后它停止工作,我必须重新部署应用程序才能获得它再次工作(可能是巧合,重新启动它不起作用,但重新部署却起作用了——但这种情况已经发生过好几次了)。

这是一个面向 .NET Core 2.1 的 V2 函数应用程序。我正在使用 Confluent.Kafka 版本 1.4.0 NuGet 包。 运行正在制定消费计划,我从 Visual Studio 发布。

似乎有很多事情会导致这些症状。正如我在对 OP 的评论中提到的那样,将 Confluent.Kafka 从 1.4.0 升级到 1.4.2 解决了我最初的问题。然而,几天后出现了完全相同的症状,发送到不同的主题并没有帮助。

为了找出发生了什么,我通过设置客户端配置 属性 Debug = "all" 启用了客户端的所有调试输出。这导致了功能应用程序实例的以下输出:

Level: Debug
Instance Name: rdkafka#producer-1
Facility: CERTROOT
Message: [thrd:app]: 38/38 certificate(s) successfully added from Windows Certificate Root store

以及函数应用程序实例中的以下内容不起作用:

Level: Debug
Instance Name: rdkafka#producer-25
Facility: CERTROOT
Message: [thrd:app]: Failed to open Windows certificate Root store: Access is denied...: falling back to OpenSSL default CA paths

Level: Debug
Instance Name: rdkafka#producer-25
Facility: BROKERFAIL
Message: [thrd:sasl_ssl://redacted/boot]: sasl_ssl://redacted/bootstrap: failed: err: Local: SSL error: (errno: No such file or directory)

the client issue tracker 找到了解决方法,但最初没有用。必须指定 cacert.pem 文件 (D:\home\site\wwwroot\cacert.pem) 的完整路径,并确保不将应用程序作为包部署。似乎最后一部分是这次的罪魁祸首,Azure 团队正在调查这一点。希望他们能解决这个问题,这样我们就不必使用和管理我们自己的 CA 证书文件了。

显然,在使用无服务器架构时必须指定根 CA 的路径是很常见的,但对于 Azure Functions,不应该 是必需的(但可能对某些人来说)。