无法从 Azure Function 发送到 Kafka 主题(Confluent Cloud)
Unable to send to Kafka topic (Confluent Cloud) from Azure Function
我的问题:
代码没有通过 ProduceAsync() 调用,也没有抛出任何异常。 Kafka 中没有关于该主题的消息出现。该功能不会失败,它只是永远不会完成,这意味着它甚至不会将状态记录到 Azure 门户从中获取它显示在 "Monitoring" 下的数据的任何地方。它不会因超过函数可以 运行 的最大时间量而触发任何异常。但是,该函数已执行,因为 ApplicationInsights 在其跟踪消息中显示了我的调试日志语句。
我尝试了几种不同的方法,但这是最新方法的代码(已更改名称以保护无辜者):
[FunctionName("MyFunction")]
public static async Task RunAsync(
[TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
ILogger log,
ExecutionContext context,
[HttpClientFactory]HttpClient httpClient
)
{
try {
// ...
using (var kafkaProducer = initKafkaProducer(config))
{
var myHelper = new MyHelper(/*...*/, kafkaProducer);
foreach (var obj in objects)
{
await myHelper.ProcessObject(obj);
}
}
}
catch(Exception ex)
{
//...
throw new Exception("My error message", ex);
}
}
private static IProducer<Null, string> initKafkaProducer(IConfigurationRoot config)
{
var pConfig = new ProducerConfig
{
BootstrapServers = config["BootstrapServers"],
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = config["Username"],
SaslPassword = config["Password"],
MessageSendMaxRetries = 10,
RetryBackoffMs = 250,
Acks = Acks.All,
LingerMs = 5
};
return new ProducerBuilder<Null, string>(pConfig).Build();
}
public class MyHelper
{
public async Task ProcessObject(MyObject obj)
{
try
{
//...
await sendToKafkaTopic(obj);
_log.LogInformation($"DEBUG: Successfully sent to Kafka");
//...
}
catch (HttpRequestException ex)
{
throw new Exception("My error message...", ex);
}
}
private async Task sendToKafkaTopic(MyObject obj)
{
string topic = _config["KafkaTopic"];
var message = new Message<Null, string> { Value = JsonConvert.SerializeObject(obj) };
try
{
var deliveryResult = await _kafkaProducer.ProduceAsync(topic, message);
_log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
}
catch (ProduceException<Null, string> e)
{
var error = e.Error;
if (error.IsError && error.IsFatal)
{
string errorMessage = "FATAL Kafka error! ";
if (error.IsBrokerError)
{
errorMessage += "BrokerError. ";
}
else if (error.IsLocalError)
{
errorMessage += "LocalError. ";
}
errorMessage += $"Error code: {error.Code}. Reason: {error.Reason}";
throw new Exception(errorMessage);
}
}
}
}
我尝试过的另一种方法是使用 Produce() 调用,使用它发送 DeliveryReport 的处理程序,并使用 _kafkaProducer.Flush(TimeSpan.FromSeconds(10))
调用。这使它看起来一切顺利,但消息没有显示在 Kafka 中。
我也尝试过使用 ProduceAsync(topic, message).ContinueWith((t) => if(t.IsFaulted)...)
方法,这似乎工作了一段时间,但在随机调用函数后它停止工作,我必须重新部署应用程序才能获得它再次工作(可能是巧合,重新启动它不起作用,但重新部署却起作用了——但这种情况已经发生过好几次了)。
这是一个面向 .NET Core 2.1 的 V2 函数应用程序。我正在使用 Confluent.Kafka 版本 1.4.0 NuGet 包。 运行正在制定消费计划,我从 Visual Studio 发布。
似乎有很多事情会导致这些症状。正如我在对 OP 的评论中提到的那样,将 Confluent.Kafka 从 1.4.0 升级到 1.4.2 解决了我最初的问题。然而,几天后出现了完全相同的症状,发送到不同的主题并没有帮助。
为了找出发生了什么,我通过设置客户端配置 属性 Debug = "all"
启用了客户端的所有调试输出。这导致了功能应用程序实例的以下输出:
Level: Debug
Instance Name: rdkafka#producer-1
Facility: CERTROOT
Message: [thrd:app]: 38/38 certificate(s) successfully added from Windows Certificate Root store
以及函数应用程序实例中的以下内容不起作用:
Level: Debug
Instance Name: rdkafka#producer-25
Facility: CERTROOT
Message: [thrd:app]: Failed to open Windows certificate Root store: Access is denied...: falling back to OpenSSL default CA paths
Level: Debug
Instance Name: rdkafka#producer-25
Facility: BROKERFAIL
Message: [thrd:sasl_ssl://redacted/boot]: sasl_ssl://redacted/bootstrap: failed: err: Local: SSL error: (errno: No such file or directory)
从 the client issue tracker 找到了解决方法,但最初没有用。必须指定 cacert.pem 文件 (D:\home\site\wwwroot\cacert.pem
) 的完整路径,并确保不将应用程序作为包部署。似乎最后一部分是这次的罪魁祸首,Azure 团队正在调查这一点。希望他们能解决这个问题,这样我们就不必使用和管理我们自己的 CA 证书文件了。
显然,在使用无服务器架构时必须指定根 CA 的路径是很常见的,但对于 Azure Functions,不应该 是必需的(但可能对某些人来说)。
我的问题:
代码没有通过 ProduceAsync() 调用,也没有抛出任何异常。 Kafka 中没有关于该主题的消息出现。该功能不会失败,它只是永远不会完成,这意味着它甚至不会将状态记录到 Azure 门户从中获取它显示在 "Monitoring" 下的数据的任何地方。它不会因超过函数可以 运行 的最大时间量而触发任何异常。但是,该函数已执行,因为 ApplicationInsights 在其跟踪消息中显示了我的调试日志语句。
我尝试了几种不同的方法,但这是最新方法的代码(已更改名称以保护无辜者):
[FunctionName("MyFunction")]
public static async Task RunAsync(
[TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
ILogger log,
ExecutionContext context,
[HttpClientFactory]HttpClient httpClient
)
{
try {
// ...
using (var kafkaProducer = initKafkaProducer(config))
{
var myHelper = new MyHelper(/*...*/, kafkaProducer);
foreach (var obj in objects)
{
await myHelper.ProcessObject(obj);
}
}
}
catch(Exception ex)
{
//...
throw new Exception("My error message", ex);
}
}
private static IProducer<Null, string> initKafkaProducer(IConfigurationRoot config)
{
var pConfig = new ProducerConfig
{
BootstrapServers = config["BootstrapServers"],
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = config["Username"],
SaslPassword = config["Password"],
MessageSendMaxRetries = 10,
RetryBackoffMs = 250,
Acks = Acks.All,
LingerMs = 5
};
return new ProducerBuilder<Null, string>(pConfig).Build();
}
public class MyHelper
{
public async Task ProcessObject(MyObject obj)
{
try
{
//...
await sendToKafkaTopic(obj);
_log.LogInformation($"DEBUG: Successfully sent to Kafka");
//...
}
catch (HttpRequestException ex)
{
throw new Exception("My error message...", ex);
}
}
private async Task sendToKafkaTopic(MyObject obj)
{
string topic = _config["KafkaTopic"];
var message = new Message<Null, string> { Value = JsonConvert.SerializeObject(obj) };
try
{
var deliveryResult = await _kafkaProducer.ProduceAsync(topic, message);
_log.LogInformation($"DEBUG: Delivered the following to {deliveryResult.TopicPartitionOffset}:\n\n{deliveryResult.Value}");
}
catch (ProduceException<Null, string> e)
{
var error = e.Error;
if (error.IsError && error.IsFatal)
{
string errorMessage = "FATAL Kafka error! ";
if (error.IsBrokerError)
{
errorMessage += "BrokerError. ";
}
else if (error.IsLocalError)
{
errorMessage += "LocalError. ";
}
errorMessage += $"Error code: {error.Code}. Reason: {error.Reason}";
throw new Exception(errorMessage);
}
}
}
}
我尝试过的另一种方法是使用 Produce() 调用,使用它发送 DeliveryReport 的处理程序,并使用 _kafkaProducer.Flush(TimeSpan.FromSeconds(10))
调用。这使它看起来一切顺利,但消息没有显示在 Kafka 中。
我也尝试过使用 ProduceAsync(topic, message).ContinueWith((t) => if(t.IsFaulted)...)
方法,这似乎工作了一段时间,但在随机调用函数后它停止工作,我必须重新部署应用程序才能获得它再次工作(可能是巧合,重新启动它不起作用,但重新部署却起作用了——但这种情况已经发生过好几次了)。
这是一个面向 .NET Core 2.1 的 V2 函数应用程序。我正在使用 Confluent.Kafka 版本 1.4.0 NuGet 包。 运行正在制定消费计划,我从 Visual Studio 发布。
似乎有很多事情会导致这些症状。正如我在对 OP 的评论中提到的那样,将 Confluent.Kafka 从 1.4.0 升级到 1.4.2 解决了我最初的问题。然而,几天后出现了完全相同的症状,发送到不同的主题并没有帮助。
为了找出发生了什么,我通过设置客户端配置 属性 Debug = "all"
启用了客户端的所有调试输出。这导致了功能应用程序实例的以下输出:
Level: Debug
Instance Name: rdkafka#producer-1
Facility: CERTROOT
Message: [thrd:app]: 38/38 certificate(s) successfully added from Windows Certificate Root store
以及函数应用程序实例中的以下内容不起作用:
Level: Debug
Instance Name: rdkafka#producer-25
Facility: CERTROOT
Message: [thrd:app]: Failed to open Windows certificate Root store: Access is denied...: falling back to OpenSSL default CA paths
Level: Debug
Instance Name: rdkafka#producer-25
Facility: BROKERFAIL
Message: [thrd:sasl_ssl://redacted/boot]: sasl_ssl://redacted/bootstrap: failed: err: Local: SSL error: (errno: No such file or directory)
从 the client issue tracker 找到了解决方法,但最初没有用。必须指定 cacert.pem 文件 (D:\home\site\wwwroot\cacert.pem
) 的完整路径,并确保不将应用程序作为包部署。似乎最后一部分是这次的罪魁祸首,Azure 团队正在调查这一点。希望他们能解决这个问题,这样我们就不必使用和管理我们自己的 CA 证书文件了。
显然,在使用无服务器架构时必须指定根 CA 的路径是很常见的,但对于 Azure Functions,不应该 是必需的(但可能对某些人来说)。