Debezium 连接器任务处于未分配状态
Debezium connector task is in unassigned state
今天 3 个节点中有一个节点不同步并重新启动。
现在,当我检查连接器任务的状态时,它显示为 UNASSIGNED,即使连接器处于 运行 状态。
工作人员运行处于分布式模式。
我尝试重新启动连接器,但它仍然是未分配的并且指向被带回集群的同一个工作节点。
以下是我的其中一名工人的属性文件,所有工人都相同:
bootstrap.servers=something:9092
group.id=ffb-supply-kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=supply-kafka-connect-offsets
config.storage.topic=supply-kafka-connect-configs
status.storage.topic=supply-kafka-connect-status
plugin.path=/var/lib/3p-kafka/connect-plugins/,/usr/share/3p-kafka/libs
连接器配置:
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "minimal",
"transforms.insertKey.fields": "external_shipment_id",
"tasks.max": "1",
"database.history.kafka.topic": "seller_oms_log_history_20220304200010",
"transforms": "unwrap,insertKey,extractKey,alterKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"include.schema.changes": "false",
"table.whitelist": "seller_oms.shipment_log",
"database.history.kafka.recovery.poll.interval.ms": "5000",
"transforms.unwrap.drop.tombstones": "false",
"database.history.skip.unparseable.ddl": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.whitelist": "seller_oms",
"transforms.alterKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.user": "cdc_user",
"transforms.extractKey.field": "external_shipment_id",
"database.server.id": "20220304200010",
"transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"database.history.kafka.bootstrap.servers": "XX:9092,YY:9092,ZZ:9092",
"transforms.alterKey.renames": "__source_ts_ms:updated_by_debezium_at",
"database.server.name": "seller_oms_log",
"heartbeat.interval.ms": "5000",
"database.port": "3306",
"key.converter.schemas.enable": "false",
"database.hostname": "master.in",
"database.password": "XXYYSS",
"value.converter.schemas.enable": "false",
"name": "shipment-log-connector-20220304200010",
"errors.tolerance": "all",
"transforms.unwrap.add.fields": "source.ts_ms",
"snapshot.mode": "schema_only"
我创建了一个具有相同配置的新连接器,它在现有连接器失败的同一个工作节点上工作正常。
我不确定为什么旧管道没有出现并进入 运行 任务状态。
当节点与集群断开连接时,是否需要做一些事情,一旦连接器返回,如何恢复该连接器?
根据我的理解,如果一个工人宕机,它应该自动将任务分配给另一个工人。
resume that connector once it is back?
在 运行 connect-distributed 脚本之后,它应该重新平衡任务。
否则,有一个 /restart API 端点
if one worker goes down, it should Automatically assign the task to another worker
根据我的经验,当任务实际失败时,它们仍然失败,并且需要重新启动端点,如果它是临时故障并且日志没有显示任何有用的信息。但是,您的 errors.tolerance
设置可能有助于在一定程度上隔离问题
今天 3 个节点中有一个节点不同步并重新启动。 现在,当我检查连接器任务的状态时,它显示为 UNASSIGNED,即使连接器处于 运行 状态。 工作人员运行处于分布式模式。
我尝试重新启动连接器,但它仍然是未分配的并且指向被带回集群的同一个工作节点。
以下是我的其中一名工人的属性文件,所有工人都相同:
bootstrap.servers=something:9092
group.id=ffb-supply-kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=supply-kafka-connect-offsets
config.storage.topic=supply-kafka-connect-configs
status.storage.topic=supply-kafka-connect-status
plugin.path=/var/lib/3p-kafka/connect-plugins/,/usr/share/3p-kafka/libs
连接器配置:
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "minimal",
"transforms.insertKey.fields": "external_shipment_id",
"tasks.max": "1",
"database.history.kafka.topic": "seller_oms_log_history_20220304200010",
"transforms": "unwrap,insertKey,extractKey,alterKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"include.schema.changes": "false",
"table.whitelist": "seller_oms.shipment_log",
"database.history.kafka.recovery.poll.interval.ms": "5000",
"transforms.unwrap.drop.tombstones": "false",
"database.history.skip.unparseable.ddl": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.whitelist": "seller_oms",
"transforms.alterKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.user": "cdc_user",
"transforms.extractKey.field": "external_shipment_id",
"database.server.id": "20220304200010",
"transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"database.history.kafka.bootstrap.servers": "XX:9092,YY:9092,ZZ:9092",
"transforms.alterKey.renames": "__source_ts_ms:updated_by_debezium_at",
"database.server.name": "seller_oms_log",
"heartbeat.interval.ms": "5000",
"database.port": "3306",
"key.converter.schemas.enable": "false",
"database.hostname": "master.in",
"database.password": "XXYYSS",
"value.converter.schemas.enable": "false",
"name": "shipment-log-connector-20220304200010",
"errors.tolerance": "all",
"transforms.unwrap.add.fields": "source.ts_ms",
"snapshot.mode": "schema_only"
我创建了一个具有相同配置的新连接器,它在现有连接器失败的同一个工作节点上工作正常。 我不确定为什么旧管道没有出现并进入 运行 任务状态。 当节点与集群断开连接时,是否需要做一些事情,一旦连接器返回,如何恢复该连接器? 根据我的理解,如果一个工人宕机,它应该自动将任务分配给另一个工人。
resume that connector once it is back?
在 运行 connect-distributed 脚本之后,它应该重新平衡任务。
否则,有一个 /restart API 端点
if one worker goes down, it should Automatically assign the task to another worker
根据我的经验,当任务实际失败时,它们仍然失败,并且需要重新启动端点,如果它是临时故障并且日志没有显示任何有用的信息。但是,您的 errors.tolerance
设置可能有助于在一定程度上隔离问题