在远程 MSK kafka 集群上使用 kafka connect mongoDB debezium souce 连接器

use kafka connect mongoDB debezium souce connector on a remote MSK kafka cluster

我想从 MongoDB 中读取数据到 Kafka 的主题中。我设法通过使用以下连接器属性文件在本地完成这项工作:

name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1

connect worker 具有以下配置:

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


zookeeper.connect=localhost:2181

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092

这在我本地的 kafka 中完美运行。我想 运行 它在远程 MSK Kafka 集群上。 由于 kafka MSK 中没有对新的 kafka connect 插件的内置支持,我在使我的 kafka connect source mongo 插件工作时遇到困难,要从我的本地机器导出连接器,我进行了以下修改: 在连接器属性级别:

name=mongodb-source-connectorszes
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=test/localhost:27017  #keeping the same local mongo
    database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
    mongodb.name=mongo_conn
    database.whitelist=test
    initial.sync.max.threads=1
    tasks.max=1

在 connect worker 级别,我进行了以下修改:

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092

但似乎这还不够,因为我收到以下错误:

[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)

通常,我设法从我的本地计算机请求 Kafka MSK 集群(通过使用 VPN 和 sshuttle 到 EC2 实例)。例如,列出远程 kafka msk 集群中的主题。我只需要做:

bin/kafka-topics.sh --list --zookeeper  remote-zookeeper-server:2181

通过转到我本地的 kafka 安装文件夹。

并且此命令完美运行,无需在我的本地计算机中更改 server.properties。知道如何解决这个问题以便将 kafka Debezium mongo 源导出到 kafka MSK 集群。

建议使用 运行 Connect/Debezium

的连接分布式脚本和属性

任何说 zookeeper.connect 的东西都应该被删除(只有 Kafka 代理使用它)。任何显示 bootstrap 服务器的内容都应指向 MSK 为您提供的地址。

如果您遇到连接错误,请务必检查防火墙/VPC 设置