如何 运行 Amazon MSK 上的 Kafka Camel 连接器

How To Run Kafka Camel Connectors On Amazon MSK

上下文: 我遵循了这个 link on setting up AWS MSK and testing a producer and consumer 并且它已设置并正常工作。我能够通过 2 个独立的 EC2 实例发送和接收消息,这两个实例都使用相同的 Kafka 集群(我的 MSK 集群)。现在,我想建立一个从 Eventhubs 到 AWS Firehose 的数据管道,它遵循以下形式:

Azure Eventhub -> Eventhub 到 Kafka Camel 连接器 -> AWS MSK -> Kafka-to-Kinesis-Firehose Camel连接器 -> AWS Kinesis Firehose

我能够在不使用 MSK 的情况下(通过常规的旧 Kafka)成功地做到这一点,但是由于未说明的原因现在需要使用 MSK 而我无法让它工作。

问题: 尝试 启动 AWS MSK 和我正在使用的两个 Camel 连接器之间的连接器时,出现以下错误:

这些是有问题的两个连接器:

  1. AWS Kinesis Firehose to Kafka Connector (Kafka -> Consumer)
  2. Azure Eventhubs to Kafka Connector (Producer -> Kafka)

目标: 让这些连接器与 MSK 一起工作,就像他们在没有它的情况下直接与 Kafka 一起工作时一样。

这是 Firehose 的问题:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

这是 Azure 的:

[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)

MSK 不提供 Kafka Connect 作为服务。您需要将其安装在您自己的计算机或其他 AWS 计算资源上。从那里,您需要安装 Camel 连接器插件

Kafka Connect 是一个与 Kafka(MSK、开源或任何其他 Kafka 发行版)一起工作的框架。但是,它不附带任何连接器。 Kafka Connect 与开源 kafka 捆绑在一起。

作为最佳实践,永远不要 运行 kafka 连接到与代理节点相同的服务器上。因为他们共享二进制文件。调整代理可能会导致卡夫卡代理出现意外问题。此外,Kafka Connect 应用程序是应用程序,您不会 运行 您的 kafka 消费者或生产者应用程序位于相同的节点上。因此,创建一个 EC2 实例并在那里部署 kafka 连接。

即将使用 TLS - 如果您启用客户端 TLS 身份验证 - 您需要查找 boostrap_broker_tls.