马斯克,将军,依卡夫卡JavaApi

MSK, IAM, and Kafka Java Api

所以出于某种原因,我无法通过 Kafka Java API 与 MSK 建立正确的连接。我可以使用 conduktor 和 Kafka CLI 工具让 producers/consumers 与 MSK 一起工作。但是,当我尝试连接我的 Scala 代码时,我无法让它工作。所以我使用如下配置通过 conduktor 和 Kafka CLI 工具进行连接。

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandle

对于我的 Scala 应用程序,我正在设置 producers/consumers 使用类似的模式

def props: Properties = {
   val p = new Properties()
   ....
   p.setProperty("security.protocol", "SASL_SSL")
   p.setProperty("sasl.mechanism", "AWS_MSK_IAM")
   p.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
   p.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
   p
}

val PRODUCER = new KafkaConsumer[AnyRef, AnyRef](props)

所以当我省略安全配置行和 运行 针对 Kafka 的本地实例时代码有效但是当我尝试点击 MSK 时它似乎没有构建消费者并且我得到了以下错误。

java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.

但是,本地 运行ning 实例可以工作。所以这让我觉得我没有在配置中正确设置一些东西来连接到 MSK。

我正在尝试遵循以下 tutorial 并且我使用的是 Scala 2.11 和 Kafka 2.41 版。我还将 aws-msk-iam-auth 添加到我的 build.sbt (1.1.0)。有什么想法或解决方案吗?

事实证明这不是我的 AWS 连接的问题,因为我按照 here 的解释实施了一些日志记录。我的问题在于我本地 运行 版本的 Kafka 和 MSK 之间的差异。我仍在努力了解差异。