Confluent 连接无法使用 MongoDB 创建接收器
Confluent connect unble to create sink using MongoDB
我正在实施 Kafka MongoDB 连接器以在 MongoDB 中接收数据。使用文档我已经配置了一个连接器。但是无法连接它。我从这样的连接中得到一个日志。所有组件都在 docker 中,我的数据库安装在本地计算机中。
[2021-06-08 12:14:14,232] INFO Cluster created with settings {hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:14,262] INFO MongoSinkTopicConfig values:
change.data.capture.handler =
collection = test
database = admin
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
document.id.strategy.overwrite.existing = false
document.id.strategy.partial.key.projection.list =
document.id.strategy.partial.key.projection.type =
document.id.strategy.partial.value.projection.list =
document.id.strategy.partial.value.projection.type =
document.id.strategy.uuid.format = string
errors.log.enable = false
errors.tolerance = none
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 1
namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
namespace.mapper.error.if.invalid = false
namespace.mapper.key.collection.field =
namespace.mapper.key.database.field =
namespace.mapper.value.collection.field =
namespace.mapper.value.database.field =
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = test
value.projection.list =
value.projection.type = none
writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
(com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,265] INFO MongoSinkTopicConfig values:
change.data.capture.handler =
collection = test
database = admin
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
document.id.strategy.overwrite.existing = false
document.id.strategy.partial.key.projection.list =
document.id.strategy.partial.key.projection.type =
document.id.strategy.partial.value.projection.list =
document.id.strategy.partial.value.projection.type =
document.id.strategy.uuid.format = string
errors.log.enable = false
errors.tolerance = none
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 1
namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
namespace.mapper.error.if.invalid = false
namespace.mapper.key.collection.field =
namespace.mapper.key.database.field =
namespace.mapper.value.collection.field =
namespace.mapper.value.database.field =
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = test
value.projection.list =
value.projection.type = none
writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
(com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,275] INFO Cluster created with settings {hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:16,269] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
[2021-06-08 12:14:16,316] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
我检查过 docker 图片是否可以 ping 我的本地 IP。我还验证了我的 MongoDB up 和 运行.
谁能帮我解决这个问题?任何帮助表示赞赏。谢谢。
出现问题只是因为我使用不同的 IP 连接 MongoDB。所以默认情况下 127.0.0.1 被列入白名单,而不是我们必须列入白名单。所以在mongo配置中使用bindIp
属性我们可以实现这个。
net:
port: 27017
bindIp: 127.0.0.1,0.0.0.0
注意:这里我把所有的ip都加入了白名单。您可以根据需要列入白名单。在生产环境中,通配符白名单是不可取的。在 MongoDB 配置文件中更改 属性 后重新启动它。
我正在实施 Kafka MongoDB 连接器以在 MongoDB 中接收数据。使用文档我已经配置了一个连接器。但是无法连接它。我从这样的连接中得到一个日志。所有组件都在 docker 中,我的数据库安装在本地计算机中。
[2021-06-08 12:14:14,232] INFO Cluster created with settings {hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:14,262] INFO MongoSinkTopicConfig values:
change.data.capture.handler =
collection = test
database = admin
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
document.id.strategy.overwrite.existing = false
document.id.strategy.partial.key.projection.list =
document.id.strategy.partial.key.projection.type =
document.id.strategy.partial.value.projection.list =
document.id.strategy.partial.value.projection.type =
document.id.strategy.uuid.format = string
errors.log.enable = false
errors.tolerance = none
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 1
namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
namespace.mapper.error.if.invalid = false
namespace.mapper.key.collection.field =
namespace.mapper.key.database.field =
namespace.mapper.value.collection.field =
namespace.mapper.value.database.field =
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = test
value.projection.list =
value.projection.type = none
writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
(com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,265] INFO MongoSinkTopicConfig values:
change.data.capture.handler =
collection = test
database = admin
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
document.id.strategy.overwrite.existing = false
document.id.strategy.partial.key.projection.list =
document.id.strategy.partial.key.projection.type =
document.id.strategy.partial.value.projection.list =
document.id.strategy.partial.value.projection.type =
document.id.strategy.uuid.format = string
errors.log.enable = false
errors.tolerance = none
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 1
namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
namespace.mapper.error.if.invalid = false
namespace.mapper.key.collection.field =
namespace.mapper.key.database.field =
namespace.mapper.value.collection.field =
namespace.mapper.value.database.field =
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = test
value.projection.list =
value.projection.type = none
writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
(com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,275] INFO Cluster created with settings {hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:16,269] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
[2021-06-08 12:14:16,316] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
我检查过 docker 图片是否可以 ping 我的本地 IP。我还验证了我的 MongoDB up 和 运行.
谁能帮我解决这个问题?任何帮助表示赞赏。谢谢。
出现问题只是因为我使用不同的 IP 连接 MongoDB。所以默认情况下 127.0.0.1 被列入白名单,而不是我们必须列入白名单。所以在mongo配置中使用bindIp
属性我们可以实现这个。
net:
port: 27017
bindIp: 127.0.0.1,0.0.0.0
注意:这里我把所有的ip都加入了白名单。您可以根据需要列入白名单。在生产环境中,通配符白名单是不可取的。在 MongoDB 配置文件中更改 属性 后重新启动它。