如何使 confluent sftp 连接器重复处理 csv 文件

how to make confluent sftp connector repeatedly process a csv file

美好的一天,

我正在尝试针对 Confluent sftp 连接器遵循此 link: https://docs.confluent.io/kafka-connect-sftp/current/source-connector/index.html

以下是我的sftp.json,基本上没什么区别,只是填了本地信息:

{
  "name": "CsvSFTP",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
    "cleanup.policy":"MOVE",
    "behavior.on.error":"IGNORE",
    "input.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1",
    "error.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/error",
    "finished.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/finished",
    "input.file.pattern": "csv-sftp-source.csv",
    "sftp.username":"meow",
    "sftp.password":"password",
    "sftp.host":"localhost",
    "sftp.port":"22",
    "kafka.topic": "sftp-testing-topic",
    "csv.first.row.as.header": "true",
    "schema.generation.enabled": "true"
  }
}

在此之后,我 运行 向 运行 连接器发出以下命令:

confluent local services connect connector load CsvSFTP --config sftp.json

接下来,我将相同的 csv 文件上传到输入文件夹。是的,我看到文件消失并移至 finished.path

我使用以下消费者命令来检查推送到主题的数据:

[meow@localhost bin]$ ./kafka-avro-console-consumer     --bootstrap-server localhost:9092     --property schema.registry.url=http://localhost:8081     --topic sftp-testing-topic2     --from-beginning
{"id":{"string":"1"},"first_name":{"string":"Salmon"},"last_name":{"string":"Baitman"},"email":{"string":"sbaitman0@feedburner.com"},"gender":{"string":"Male"},"ip_address":{"string":"120.181.75.98"},"last_login":{"string":"2015-03-01T06:01:15Z"},"account_balance":{"string":"17462.66"},"country":{"string":"IT"},"favorite_color":{"string":"#f09bc0"}}
{"id":{"string":"2"},"first_name":{"string":"Debby"},"last_name":{"string":"Brea"},"email":{"string":"dbrea1@icio.us"},"gender":{"string":"Female"},"ip_address":{"string":"153.239.187.49"},"last_login":{"string":"2018-10-21T12:27:12Z"},"account_balance":{"string":"14693.49"},"country":{"string":"CZ"},"favorite_color":{"string":"#73893a"}}

到目前为止一切顺利,现在一切正常。

在此之后,我使用回 csv 文件,并将名字从 'Salmon' 编辑为 'Salmon2'。然后我再次上传 csv 文件,但此时,该文件未被处理。当我检查连接器状态时,它是 运行ning,即使我检查了 connect.log,我也只看到它打印没有产生任何记录:

[2022-03-16 17:14:22,129] INFO [CsvSFTP|task-0|offsets] WorkerSourceTask{id=CsvSFTP2-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

然后,我卸载连接器并再次加载,我看到文件再次消失并移动到 finished.path。我预计消费者会打印另外 2 行记录,其中 1 行包含我对名字的更改,即“Salmon2”,但它没有,消费者只是保持不变。

请问我做错了什么吗?或者这是预期的结果?

这听起来像是预期的行为。源连接器(大部分)在偏移量主题中维护状态。如果它处理了该文件,那么它会跟踪它不应该再做一次,如果连接器曾经重新启动或以其他方式重新加载。

您需要 modify this,或更改连接器的名称,使其独一无二以“重新开始”