Debezium 连接器任务处于未分配状态

Debezium connector task is in unassigned state

今天 3 个节点中有一个节点不同步并重新启动。 现在,当我检查连接器任务的状态时,它显示为 UNASSIGNED,即使连接器处于 运行 状态。 工作人员运行处于分布式模式。

我尝试重新启动连接器,但它仍然是未分配的并且指向被带回集群的同一个工作节点。

以下是我的其中一名工人的属性文件,所有工人都相同:

bootstrap.servers=something:9092
group.id=ffb-supply-kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=supply-kafka-connect-offsets
config.storage.topic=supply-kafka-connect-configs
status.storage.topic=supply-kafka-connect-status
plugin.path=/var/lib/3p-kafka/connect-plugins/,/usr/share/3p-kafka/libs

连接器配置:

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "transforms.insertKey.fields": "external_shipment_id",
  "tasks.max": "1",
  "database.history.kafka.topic": "seller_oms_log_history_20220304200010",
  "transforms": "unwrap,insertKey,extractKey,alterKey",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "include.schema.changes": "false",
  "table.whitelist": "seller_oms.shipment_log",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "transforms.unwrap.drop.tombstones": "false",
  "database.history.skip.unparseable.ddl": "true",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.whitelist": "seller_oms",
  "transforms.alterKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.user": "cdc_user",
  "transforms.extractKey.field": "external_shipment_id",
  "database.server.id": "20220304200010",
  "transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "database.history.kafka.bootstrap.servers": "XX:9092,YY:9092,ZZ:9092",
  "transforms.alterKey.renames": "__source_ts_ms:updated_by_debezium_at",
  "database.server.name": "seller_oms_log",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "key.converter.schemas.enable": "false",
  "database.hostname": "master.in",
  "database.password": "XXYYSS",
  "value.converter.schemas.enable": "false",
  "name": "shipment-log-connector-20220304200010",
  "errors.tolerance": "all",
  "transforms.unwrap.add.fields": "source.ts_ms",
  "snapshot.mode": "schema_only"

我创建了一个具有相同配置的新连接器,它在现有连接器失败的同一个工作节点上工作正常。 我不确定为什么旧管道没有出现并进入 运行 任务状态。 当节点与集群断开连接时,是否需要做一些事情,一旦连接器返回,如何恢复该连接器? 根据我的理解,如果一个工人宕机,它应该自动将任务分配给另一个工人。

resume that connector once it is back?

在 运行 connect-distributed 脚本之后,它应该重新平衡任务。

否则,有一个 /restart API 端点

if one worker goes down, it should Automatically assign the task to another worker

根据我的经验,当任务实际失败时,它们仍然失败,并且需要重新启动端点,如果它是临时故障并且日志没有显示任何有用的信息。但是,您的 errors.tolerance 设置可能有助于在一定程度上隔离问题