如何使 confluent sftp 连接器重复处理 csv 文件
how to make confluent sftp connector repeatedly process a csv file
美好的一天,
我正在尝试针对 Confluent sftp 连接器遵循此 link:
https://docs.confluent.io/kafka-connect-sftp/current/source-connector/index.html
以下是我的sftp.json,基本上没什么区别,只是填了本地信息:
{
"name": "CsvSFTP",
"config": {
"tasks.max": "1",
"connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy":"MOVE",
"behavior.on.error":"IGNORE",
"input.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1",
"error.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/error",
"finished.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/finished",
"input.file.pattern": "csv-sftp-source.csv",
"sftp.username":"meow",
"sftp.password":"password",
"sftp.host":"localhost",
"sftp.port":"22",
"kafka.topic": "sftp-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "true"
}
}
在此之后,我 运行 向 运行 连接器发出以下命令:
confluent local services connect connector load CsvSFTP --config sftp.json
接下来,我将相同的 csv 文件上传到输入文件夹。是的,我看到文件消失并移至 finished.path
。
我使用以下消费者命令来检查推送到主题的数据:
[meow@localhost bin]$ ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic sftp-testing-topic2 --from-beginning
{"id":{"string":"1"},"first_name":{"string":"Salmon"},"last_name":{"string":"Baitman"},"email":{"string":"sbaitman0@feedburner.com"},"gender":{"string":"Male"},"ip_address":{"string":"120.181.75.98"},"last_login":{"string":"2015-03-01T06:01:15Z"},"account_balance":{"string":"17462.66"},"country":{"string":"IT"},"favorite_color":{"string":"#f09bc0"}}
{"id":{"string":"2"},"first_name":{"string":"Debby"},"last_name":{"string":"Brea"},"email":{"string":"dbrea1@icio.us"},"gender":{"string":"Female"},"ip_address":{"string":"153.239.187.49"},"last_login":{"string":"2018-10-21T12:27:12Z"},"account_balance":{"string":"14693.49"},"country":{"string":"CZ"},"favorite_color":{"string":"#73893a"}}
到目前为止一切顺利,现在一切正常。
在此之后,我使用回 csv 文件,并将名字从 'Salmon' 编辑为 'Salmon2'。然后我再次上传 csv 文件,但此时,该文件未被处理。当我检查连接器状态时,它是 运行ning,即使我检查了 connect.log,我也只看到它打印没有产生任何记录:
[2022-03-16 17:14:22,129] INFO [CsvSFTP|task-0|offsets] WorkerSourceTask{id=CsvSFTP2-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
然后,我卸载连接器并再次加载,我看到文件再次消失并移动到 finished.path
。我预计消费者会打印另外 2 行记录,其中 1 行包含我对名字的更改,即“Salmon2”,但它没有,消费者只是保持不变。
请问我做错了什么吗?或者这是预期的结果?
这听起来像是预期的行为。源连接器(大部分)在偏移量主题中维护状态。如果它处理了该文件,那么它会跟踪它不应该再做一次,如果连接器曾经重新启动或以其他方式重新加载。
您需要 modify this,或更改连接器的名称,使其独一无二以“重新开始”
美好的一天,
我正在尝试针对 Confluent sftp 连接器遵循此 link: https://docs.confluent.io/kafka-connect-sftp/current/source-connector/index.html
以下是我的sftp.json,基本上没什么区别,只是填了本地信息:
{
"name": "CsvSFTP",
"config": {
"tasks.max": "1",
"connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy":"MOVE",
"behavior.on.error":"IGNORE",
"input.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1",
"error.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/error",
"finished.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/finished",
"input.file.pattern": "csv-sftp-source.csv",
"sftp.username":"meow",
"sftp.password":"password",
"sftp.host":"localhost",
"sftp.port":"22",
"kafka.topic": "sftp-testing-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "true"
}
}
在此之后,我 运行 向 运行 连接器发出以下命令:
confluent local services connect connector load CsvSFTP --config sftp.json
接下来,我将相同的 csv 文件上传到输入文件夹。是的,我看到文件消失并移至 finished.path
。
我使用以下消费者命令来检查推送到主题的数据:
[meow@localhost bin]$ ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic sftp-testing-topic2 --from-beginning
{"id":{"string":"1"},"first_name":{"string":"Salmon"},"last_name":{"string":"Baitman"},"email":{"string":"sbaitman0@feedburner.com"},"gender":{"string":"Male"},"ip_address":{"string":"120.181.75.98"},"last_login":{"string":"2015-03-01T06:01:15Z"},"account_balance":{"string":"17462.66"},"country":{"string":"IT"},"favorite_color":{"string":"#f09bc0"}}
{"id":{"string":"2"},"first_name":{"string":"Debby"},"last_name":{"string":"Brea"},"email":{"string":"dbrea1@icio.us"},"gender":{"string":"Female"},"ip_address":{"string":"153.239.187.49"},"last_login":{"string":"2018-10-21T12:27:12Z"},"account_balance":{"string":"14693.49"},"country":{"string":"CZ"},"favorite_color":{"string":"#73893a"}}
到目前为止一切顺利,现在一切正常。
在此之后,我使用回 csv 文件,并将名字从 'Salmon' 编辑为 'Salmon2'。然后我再次上传 csv 文件,但此时,该文件未被处理。当我检查连接器状态时,它是 运行ning,即使我检查了 connect.log,我也只看到它打印没有产生任何记录:
[2022-03-16 17:14:22,129] INFO [CsvSFTP|task-0|offsets] WorkerSourceTask{id=CsvSFTP2-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
然后,我卸载连接器并再次加载,我看到文件再次消失并移动到 finished.path
。我预计消费者会打印另外 2 行记录,其中 1 行包含我对名字的更改,即“Salmon2”,但它没有,消费者只是保持不变。
请问我做错了什么吗?或者这是预期的结果?
这听起来像是预期的行为。源连接器(大部分)在偏移量主题中维护状态。如果它处理了该文件,那么它会跟踪它不应该再做一次,如果连接器曾经重新启动或以其他方式重新加载。
您需要 modify this,或更改连接器的名称,使其独一无二以“重新开始”