Kafka Connect SftpCSVSourceConnector 架构配置
Kafka Connect SftpCSVSourceConnector schema configuration
我正在尝试在本地环境中设置 SftpCSVSourceConnector,但在为连接器设置架构时遇到了一些问题。这就是我想要做的
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/nc-csv-02/config \
-d '{
"tasks.max" : "1",
"connector.class" : "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "sftp-csv-00",
"cleanup.policy":"NONE",
"behavior.on.error":"IGNORE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"input.path" : "/",
"csv.separator.char" : 59,
"finished.path" : "/finished",
"error.path" : "/error",
"schema.generation.key.fields" : "msisdn",
"input.file.pattern" : ".*\.dat",
"schema.generation.enabled" : "false",
"csv.first.row.as.header" : "true",
"key.schema":"{\"fields\":[{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]}],\"name\":\"NCKeySchema\",\"type\":\"record\"}",
"value.schema":"{\"name\":\"NCPortabilityMovementEvent\",\"type\":\"record\",\"fields\":[{\"default\":null,\"name\":\"action\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"previousNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"newNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"effectiveDate\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"referenceID\",\"type\":[\"null\",\"string\"]}]}",
"sftp.username":"tester",
"sftp.password":"password",
"sftp.host":"192.168.1.2",
"sftp.port":"22"
}'
我在工作任务中看到的异常是
org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "fields" (class com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage), not marked as ignorable (10 known properties: "defaultValue", "valueSchema", "doc", "type", "name", "keySchema", "version", "parameters", "isOptional", "fieldSchemas"])
at [Source: (String)"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}"; line: 1, column: 12] (through reference chain: com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage["fields"]) for configuration Could not read schema from 'key.schema'
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.readSchema(SftpSourceConnectorConfig.java:334)
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.<init>(SftpSourceConnectorConfig.java:117)
at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.<init>(SftpCsvSourceConnectorConfig.java:156)
at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:44)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:141)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我尝试用于键和值的模式是
{
"fields": [
{
"default": null,
"name": "msisdn",
"type": [
"null",
"string"
]
}
],
"name": "NCKeySchema",
"type": "record"
}
和
{
"name" : "NCPortabilityMovementEvent",
"type" : "record",
"fields" : [
{
"default" : null,
"name" : "action",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "msisdn",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "previousNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "newNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "effectiveDate",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "referenceID",
"type" : [
"null",
"string"
]
}
]
}
我做错了什么?
我用 schema.generation.enabled=true
尝试了这个并删除了 key.schema
和 value.schema
连接器工作得很好。
您提供的 Avro 架构不正确。您需要定义 Connect 架构,其类型为 fieldSchemas
的 type=STRUCT。格式本身没有很好的文档记录,但这里有示例 https://docs.confluent.io/kafka-connect-sftp/current/source-connector/csv_source_connector.html#sftp-connector-csv-with-schema-example
您可以在此处找到架构 json 反序列化器的源代码 - https://github.com/jcustenborder/connect-utils/tree/master/connect-utils-jackson/src/main/java/com/github/jcustenborder/kafka/connect/utils/jackson
我正在尝试在本地环境中设置 SftpCSVSourceConnector,但在为连接器设置架构时遇到了一些问题。这就是我想要做的
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/nc-csv-02/config \
-d '{
"tasks.max" : "1",
"connector.class" : "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "sftp-csv-00",
"cleanup.policy":"NONE",
"behavior.on.error":"IGNORE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"input.path" : "/",
"csv.separator.char" : 59,
"finished.path" : "/finished",
"error.path" : "/error",
"schema.generation.key.fields" : "msisdn",
"input.file.pattern" : ".*\.dat",
"schema.generation.enabled" : "false",
"csv.first.row.as.header" : "true",
"key.schema":"{\"fields\":[{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]}],\"name\":\"NCKeySchema\",\"type\":\"record\"}",
"value.schema":"{\"name\":\"NCPortabilityMovementEvent\",\"type\":\"record\",\"fields\":[{\"default\":null,\"name\":\"action\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"previousNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"newNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"effectiveDate\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"referenceID\",\"type\":[\"null\",\"string\"]}]}",
"sftp.username":"tester",
"sftp.password":"password",
"sftp.host":"192.168.1.2",
"sftp.port":"22"
}'
我在工作任务中看到的异常是
org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "fields" (class com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage), not marked as ignorable (10 known properties: "defaultValue", "valueSchema", "doc", "type", "name", "keySchema", "version", "parameters", "isOptional", "fieldSchemas"])
at [Source: (String)"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}"; line: 1, column: 12] (through reference chain: com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage["fields"]) for configuration Could not read schema from 'key.schema'
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.readSchema(SftpSourceConnectorConfig.java:334)
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.<init>(SftpSourceConnectorConfig.java:117)
at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.<init>(SftpCsvSourceConnectorConfig.java:156)
at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:44)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:141)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我尝试用于键和值的模式是
{
"fields": [
{
"default": null,
"name": "msisdn",
"type": [
"null",
"string"
]
}
],
"name": "NCKeySchema",
"type": "record"
}
和
{
"name" : "NCPortabilityMovementEvent",
"type" : "record",
"fields" : [
{
"default" : null,
"name" : "action",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "msisdn",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "previousNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "newNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "effectiveDate",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "referenceID",
"type" : [
"null",
"string"
]
}
]
}
我做错了什么?
我用 schema.generation.enabled=true
尝试了这个并删除了 key.schema
和 value.schema
连接器工作得很好。
您提供的 Avro 架构不正确。您需要定义 Connect 架构,其类型为 fieldSchemas
的 type=STRUCT。格式本身没有很好的文档记录,但这里有示例 https://docs.confluent.io/kafka-connect-sftp/current/source-connector/csv_source_connector.html#sftp-connector-csv-with-schema-example
您可以在此处找到架构 json 反序列化器的源代码 - https://github.com/jcustenborder/connect-utils/tree/master/connect-utils-jackson/src/main/java/com/github/jcustenborder/kafka/connect/utils/jackson