使用独立模式 Kafka-connect 从 Postgres SQL 更改数据捕获到 kafka 主题
Change-data-capture from Postgres SQL to kafka topics using standalone mode Kafka-connect
我一直在尝试使用以下命令从 postgres sql 获取数据到 kafka 主题 /bin connect-standalone.properties config/connect-standalone.properties postgres.sproperties,但是我面临几个问题
这是我的 postgres.properties 文件的内容:
name=cdc_demo
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=decoderbufs
slot.name=debezium
slot.drop_on_stop=false
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=XXXXX
database.dbname=snehildb
time.precision.mode=adaptive
database.sslmode=disable
database.server.name=localhost:5432/snehildb
table.whitelist=public.students
decimal.handling.mode=precise
topic.creation.enable=true`
这里是connect-standalone.properties的内容:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every
Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into
Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the
converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for
plugins
# (connectors, converters, transformations). The list should consist of top level directories that
include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and
their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/azureuser/plugins
我收到了几个警告,但我无法解决以下三个主要错误:
ERROR Postgres server wal_level property must be "logical" but is: replica
(io.debezium.connector.postgresql.PostgresConnector:101)
(org.apache.kafka.common.config.AbstractConfig:361)
ERROR Failed to create job for config/postgres.properties
(org.apache.kafka.connect.cli.ConnectStandalone:110)
ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
我是 Kafka 的新手,如果有人能指出我的错误,那将非常有帮助。
Debezium 要求 wal_level 为 logical
:
https://www.postgresql.org/docs/9.6/runtime-config-wal.html
查看 class 处的 postgres 连接器内部:
io.debezium.connector.postgresql.PostgresConnector.java 在 debeizum 仓库中:
我一直在尝试使用以下命令从 postgres sql 获取数据到 kafka 主题 /bin connect-standalone.properties config/connect-standalone.properties postgres.sproperties,但是我面临几个问题 这是我的 postgres.properties 文件的内容:
name=cdc_demo
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=decoderbufs
slot.name=debezium
slot.drop_on_stop=false
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=XXXXX
database.dbname=snehildb
time.precision.mode=adaptive
database.sslmode=disable
database.server.name=localhost:5432/snehildb
table.whitelist=public.students
decimal.handling.mode=precise
topic.creation.enable=true`
这里是connect-standalone.properties的内容:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every
Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into
Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the
converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for
plugins
# (connectors, converters, transformations). The list should consist of top level directories that
include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and
their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/azureuser/plugins
我收到了几个警告,但我无法解决以下三个主要错误:
ERROR Postgres server wal_level property must be "logical" but is: replica
(io.debezium.connector.postgresql.PostgresConnector:101)
(org.apache.kafka.common.config.AbstractConfig:361)
ERROR Failed to create job for config/postgres.properties
(org.apache.kafka.connect.cli.ConnectStandalone:110)
ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
我是 Kafka 的新手,如果有人能指出我的错误,那将非常有帮助。
Debezium 要求 wal_level 为 logical
:
https://www.postgresql.org/docs/9.6/runtime-config-wal.html
查看 class 处的 postgres 连接器内部:
io.debezium.connector.postgresql.PostgresConnector.java 在 debeizum 仓库中: