马斯克,将军,依卡夫卡JavaApi
MSK, IAM, and Kafka Java Api
所以出于某种原因,我无法通过 Kafka Java API 与 MSK 建立正确的连接。我可以使用 conduktor 和 Kafka CLI 工具让 producers/consumers 与 MSK 一起工作。但是,当我尝试连接我的 Scala 代码时,我无法让它工作。所以我使用如下配置通过 conduktor
和 Kafka CLI 工具进行连接。
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandle
对于我的 Scala 应用程序,我正在设置 producers/consumers 使用类似的模式
def props: Properties = {
val p = new Properties()
....
p.setProperty("security.protocol", "SASL_SSL")
p.setProperty("sasl.mechanism", "AWS_MSK_IAM")
p.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
p.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
p
}
val PRODUCER = new KafkaConsumer[AnyRef, AnyRef](props)
所以当我省略安全配置行和 运行 针对 Kafka 的本地实例时代码有效但是当我尝试点击 MSK 时它似乎没有构建消费者并且我得到了以下错误。
java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.
但是,本地 运行ning 实例可以工作。所以这让我觉得我没有在配置中正确设置一些东西来连接到 MSK。
我正在尝试遵循以下 tutorial 并且我使用的是 Scala 2.11 和 Kafka 2.41 版。我还将 aws-msk-iam-auth
添加到我的 build.sbt (1.1.0
)。有什么想法或解决方案吗?
事实证明这不是我的 AWS 连接的问题,因为我按照 here 的解释实施了一些日志记录。我的问题在于我本地 运行 版本的 Kafka 和 MSK 之间的差异。我仍在努力了解差异。
所以出于某种原因,我无法通过 Kafka Java API 与 MSK 建立正确的连接。我可以使用 conduktor 和 Kafka CLI 工具让 producers/consumers 与 MSK 一起工作。但是,当我尝试连接我的 Scala 代码时,我无法让它工作。所以我使用如下配置通过 conduktor
和 Kafka CLI 工具进行连接。
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandle
对于我的 Scala 应用程序,我正在设置 producers/consumers 使用类似的模式
def props: Properties = {
val p = new Properties()
....
p.setProperty("security.protocol", "SASL_SSL")
p.setProperty("sasl.mechanism", "AWS_MSK_IAM")
p.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
p.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
p
}
val PRODUCER = new KafkaConsumer[AnyRef, AnyRef](props)
所以当我省略安全配置行和 运行 针对 Kafka 的本地实例时代码有效但是当我尝试点击 MSK 时它似乎没有构建消费者并且我得到了以下错误。
java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.
但是,本地 运行ning 实例可以工作。所以这让我觉得我没有在配置中正确设置一些东西来连接到 MSK。
我正在尝试遵循以下 tutorial 并且我使用的是 Scala 2.11 和 Kafka 2.41 版。我还将 aws-msk-iam-auth
添加到我的 build.sbt (1.1.0
)。有什么想法或解决方案吗?
事实证明这不是我的 AWS 连接的问题,因为我按照 here 的解释实施了一些日志记录。我的问题在于我本地 运行 版本的 Kafka 和 MSK 之间的差异。我仍在努力了解差异。