在远程 MSK kafka 集群上使用 kafka connect mongoDB debezium souce 连接器
use kafka connect mongoDB debezium souce connector on a remote MSK kafka cluster
我想从 MongoDB 中读取数据到 Kafka 的主题中。我设法通过使用以下连接器属性文件在本地完成这项工作:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
connect worker 具有以下配置:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=localhost:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092
这在我本地的 kafka 中完美运行。我想 运行 它在远程 MSK Kafka 集群上。
由于 kafka MSK 中没有对新的 kafka connect 插件的内置支持,我在使我的 kafka connect source mongo 插件工作时遇到困难,要从我的本地机器导出连接器,我进行了以下修改:
在连接器属性级别:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017 #keeping the same local mongo
database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
在 connect worker 级别,我进行了以下修改:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092
但似乎这还不够,因为我收到以下错误:
[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
通常,我设法从我的本地计算机请求 Kafka MSK 集群(通过使用 VPN 和 sshuttle 到 EC2 实例)。例如,列出远程 kafka msk 集群中的主题。我只需要做:
bin/kafka-topics.sh --list --zookeeper remote-zookeeper-server:2181
通过转到我本地的 kafka 安装文件夹。
并且此命令完美运行,无需在我的本地计算机中更改 server.properties。知道如何解决这个问题以便将 kafka Debezium mongo 源导出到 kafka MSK 集群。
建议使用 运行 Connect/Debezium
的连接分布式脚本和属性
任何说 zookeeper.connect 的东西都应该被删除(只有 Kafka 代理使用它)。任何显示 bootstrap 服务器的内容都应指向 MSK 为您提供的地址。
如果您遇到连接错误,请务必检查防火墙/VPC 设置
我想从 MongoDB 中读取数据到 Kafka 的主题中。我设法通过使用以下连接器属性文件在本地完成这项工作:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
connect worker 具有以下配置:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=localhost:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092
这在我本地的 kafka 中完美运行。我想 运行 它在远程 MSK Kafka 集群上。 由于 kafka MSK 中没有对新的 kafka connect 插件的内置支持,我在使我的 kafka connect source mongo 插件工作时遇到困难,要从我的本地机器导出连接器,我进行了以下修改: 在连接器属性级别:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017 #keeping the same local mongo
database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
在 connect worker 级别,我进行了以下修改:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092
但似乎这还不够,因为我收到以下错误:
[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
通常,我设法从我的本地计算机请求 Kafka MSK 集群(通过使用 VPN 和 sshuttle 到 EC2 实例)。例如,列出远程 kafka msk 集群中的主题。我只需要做:
bin/kafka-topics.sh --list --zookeeper remote-zookeeper-server:2181
通过转到我本地的 kafka 安装文件夹。
并且此命令完美运行,无需在我的本地计算机中更改 server.properties。知道如何解决这个问题以便将 kafka Debezium mongo 源导出到 kafka MSK 集群。
建议使用 运行 Connect/Debezium
的连接分布式脚本和属性任何说 zookeeper.connect 的东西都应该被删除(只有 Kafka 代理使用它)。任何显示 bootstrap 服务器的内容都应指向 MSK 为您提供的地址。
如果您遇到连接错误,请务必检查防火墙/VPC 设置