如何指定连接器是源还是汇?

How to specify whether a connector is a source or a sink?

我目前正在配置 kafka connect(使用 debezium/connect docker image),我使用环境变量成功地将它连接到 Kafka:

docker run -it --rm --name AAAAAA-kafka-connect -p 8083:8083 \
    -v aaaaa.jks:aaaaa.jks \
    -v bbbbbb.jks:bbbbbb.jks \
    -e LOG_LEVEL=INFO \
    -e HOST_NAME="AAAAAA-kafka-connect" \
    -e HEAP_OPTS="-Xms256m -Xmx2g" \
    -e BOOTSTRAP_SERVERS="BBBBB:9092" \
    -e CONNECT_CLIENT_ID="xxx-kafka-connect" \
    -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"...\" password=\"...\";" \
    -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
    -e CONNECT_SASL_MECHANISM="PLAIN" \
    -e CONNECT_SSL_TRUSTSTORE_LOCATION="bbbbbb.jks" \
    -e CONNECT_SSL_TRUSTSTORE_PASSWORD="..." \
    -e CONNECT_SSL_KEYSTORE_LOCATION="aaaaa.jks" \
    -e CONNECT_SSL_KEYSTORE_PASSWORD="..." \
    -e GROUP_ID="XXX.grp.kafka.connect" \
    -e CONFIG_STORAGE_TOPIC="XXX.connect.configs.v1" \
    -e OFFSET_STORAGE_TOPIC="XXX.connect.offsets.v1" \
    -e STATUS_STORAGE_TOPIC="XXX.connect.statuses.v1" \
    quay.io/debezium/connect:1.9

现在我必须创建一个源连接器 (posgresql db),我希望 kafka 连接器将从源中抓取的数据汇入 kafka 主题中。

由于数据库连接器的 json 配置中没有这样的配置,我必须在哪里设置接收器的 kafka 配置?

我必须创建一个连接到 kafka 主题的接收器连接器吗?如果是这样,我们在哪里指定这是接收器还是源连接器?

PS: 我已经创建了我想将数据放入其中的 kafka 主题

欢迎提问

好的,你必须添加 CONNECT_PRODUCER_*CONNECT_CONSUMER_* 环境变量来指定源或接收器的配置!!!!!!

像这样:

docker run -it --rm --name AAAAAA-kafka-connect -p 8083:8083 \
    -v aaaaa.jks:aaaaa.jks \
    -v bbbbbb.jks:bbbbbb.jks \
    -e LOG_LEVEL=INFO \
    -e HOST_NAME="AAAAAA-kafka-connect" \
    -e HEAP_OPTS="-Xms256m -Xmx2g" \
    -e BOOTSTRAP_SERVERS="BBBBB:9092" \
    -e CONNECT_CLIENT_ID="xxx-kafka-connect" \
    -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"...\" password=\"...\";" \
    -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
    -e CONNECT_SASL_MECHANISM="PLAIN" \
    -e CONNECT_SSL_TRUSTSTORE_LOCATION="bbbbbb.jks" \
    -e CONNECT_SSL_TRUSTSTORE_PASSWORD="..." \
    -e CONNECT_SSL_KEYSTORE_LOCATION="aaaaa.jks" \
    -e CONNECT_SSL_KEYSTORE_PASSWORD="..." \
    -e GROUP_ID="XXX.grp.kafka.connect" \
    -e CONFIG_STORAGE_TOPIC="XXX.connect.configs.v1" \
    -e OFFSET_STORAGE_TOPIC="XXX.connect.offsets.v1" \
    -e STATUS_STORAGE_TOPIC="XXX.connect.statuses.v1" \
    -e CONNECT_PRODUCER_TOPIC_CREATION_ENABLE=false \
    -e CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"...\" password=\"...\";" \
    -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL" \
    -e CONNECT_PRODUCER_SASL_MECHANISM="PLAIN" \
    -e CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION="bbbbbb.jks" \
    -e CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD="..." \
    -e CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION="aaaaa.jks" \
    -e CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD="..." \
    -e CONNECT_PRODUCER_CLIENT_ID="xxx-kafka-connect" \
    -e CONNECT_PRODUCER_TOPIC_CREATION_ENABLE=false \
    quay.io/debezium/connect:1.9

sinksource属性来自连接器json定义中使用的connector.class。但是,Debeziums CDC 连接器只能用作从外部数据库系统 (https://hevodata.com/learn/debezium-vs-kafka-connect/#:~:text=Debezium%20platform%20has%20a%20vast,records%20from%20external%20database%20systems.)

捕获 real-time 事件更改记录的源连接器

环境变量只修改客户端参数。

Source 和 Sinks 是在您实际创建连接器时确定的。您需要一个 JSON 配置,它将有一个 connector.class.

在 Kafka API 中有 SinkTask and SourceTask.

Debezium 总是 一个来源。消息来源写给卡夫卡;这不会使卡夫卡成为一个水槽。您需要安装一个新的连接器插件来为您的数据库 获取一个接收器 ,例如来自 Confluent 的 JDBC 连接器,它具有用于源和接收器的 类。