Kafka Connect SftpCSVSourceConnector 架构配置

Kafka Connect SftpCSVSourceConnector schema configuration

我正在尝试在本地环境中设置 SftpCSVSourceConnector,但在为连接器设置架构时遇到了一些问题。这就是我想要做的


 curl -i -X PUT -H "Accept:application/json" \
 -H "Content-Type:application/json" http://localhost:8083/connectors/nc-csv-02/config \
 -d '{
    "tasks.max" : "1",
    "connector.class" : "io.confluent.connect.sftp.SftpCsvSourceConnector",
    "kafka.topic": "sftp-csv-00",
    "cleanup.policy":"NONE",
    "behavior.on.error":"IGNORE",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "input.path" : "/",
    "csv.separator.char" : 59,
    "finished.path" : "/finished",
    "error.path" : "/error",
    "schema.generation.key.fields" : "msisdn",
    "input.file.pattern" : ".*\.dat",
    "schema.generation.enabled" : "false",
    "csv.first.row.as.header" : "true",
    "key.schema":"{\"fields\":[{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]}],\"name\":\"NCKeySchema\",\"type\":\"record\"}",
    "value.schema":"{\"name\":\"NCPortabilityMovementEvent\",\"type\":\"record\",\"fields\":[{\"default\":null,\"name\":\"action\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"previousNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"newNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"effectiveDate\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"referenceID\",\"type\":[\"null\",\"string\"]}]}",
    "sftp.username":"tester",
    "sftp.password":"password",
    "sftp.host":"192.168.1.2",
    "sftp.port":"22"
 }'

我在工作任务中看到的异常是

org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "fields" (class com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage), not marked as ignorable (10 known properties: "defaultValue", "valueSchema", "doc", "type", "name", "keySchema", "version", "parameters", "isOptional", "fieldSchemas"])
 at [Source: (String)"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}"; line: 1, column: 12] (through reference chain: com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage["fields"]) for configuration Could not read schema from 'key.schema'
    at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.readSchema(SftpSourceConnectorConfig.java:334)
    at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.<init>(SftpSourceConnectorConfig.java:117)
    at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.<init>(SftpCsvSourceConnectorConfig.java:156)
    at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:44)
    at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
    at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
    at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
    at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
    at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:141)
    at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)


我尝试用于键和值的模式是

{
  "fields": [
    {
      "default": null,
      "name": "msisdn",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "NCKeySchema",
  "type": "record"
}

{
    "name" : "NCPortabilityMovementEvent",
    "type" : "record",
    "fields" : [
        {
            "default" : null,
            "name" : "action",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "msisdn",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "previousNRN",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "newNRN",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "effectiveDate",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "referenceID",
            "type" : [
                "null",
                "string"
            ]
        }
    ]
}

我做错了什么?

我用 schema.generation.enabled=true 尝试了这个并删除了 key.schemavalue.schema 连接器工作得很好。

您提供的 Avro 架构不正确。您需要定义 Connect 架构,其类型为 fieldSchemas 的 type=STRUCT。格式本身没有很好的文档记录,但这里有示例 https://docs.confluent.io/kafka-connect-sftp/current/source-connector/csv_source_connector.html#sftp-connector-csv-with-schema-example

您可以在此处找到架构 json 反序列化器的源代码 - https://github.com/jcustenborder/connect-utils/tree/master/connect-utils-jackson/src/main/java/com/github/jcustenborder/kafka/connect/utils/jackson