无法在kafka connect中使用接收器连接器
Unable to use sink connector inside kafka connect
我正在尝试在 kafka connect 中使用 S3 接收器连接器,它启动并稍后失败。
我的配置如下:
{
"name": "my-s3-sink3",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"topics":"mysource.topic",
"s3.region":"us-east-1",
"s3.bucket.name": "topicbucket001",
"s3.part.size":"5242880",
"flush.size":"1",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE"
}
}
我的连接-distributed.properties 看起来像:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all
完整的错误日志:
[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)
2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
消息类型:
{
"registertime": 1511985752912,
"userid": "User_6",
"regionid": "Region_8",
"gender": "FEMALE"
}
新错误日志:
问题出在 Key SerDe 上。根据您的屏幕截图,关键数据是非 JSON 字符串:
User_2
User_9
etc
所以不用
key.converter=org.apache.kafka.connect.json.JsonConverter
使用
key.converter=org.apache.kafka.connect.storage.StringConverter
编辑:
为您的连接器配置尝试此操作,明确指定转换器(如@OneCricketeer 所建议)
{
"name": "my-s3-sink3",
"config": {
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"tasks.max" : "1",
"topics" : "mysource.topic",
"s3.region" : "us-east-1",
"s3.bucket.name" : "topicbucket001",
"s3.part.size" : "5242880",
"flush.size" : "1",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility" : "NONE"
}
}
所以我能够解决这个问题。明确指定转换器后,我能够解决反序列化错误,然后遇到 S3 分段上传问题,该问题通过将 S3 IAM 策略附加到 ECS 任务定义来授予 Fargate 任务对 S3 存储桶的权限来解决。
感谢 Robin Moffatt 提供上述解决方案!
我正在尝试在 kafka connect 中使用 S3 接收器连接器,它启动并稍后失败。
我的配置如下:
{
"name": "my-s3-sink3",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"topics":"mysource.topic",
"s3.region":"us-east-1",
"s3.bucket.name": "topicbucket001",
"s3.part.size":"5242880",
"flush.size":"1",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE"
}
}
我的连接-distributed.properties 看起来像:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all
完整的错误日志:
[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)
2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
消息类型:
{
"registertime": 1511985752912,
"userid": "User_6",
"regionid": "Region_8",
"gender": "FEMALE"
}
新错误日志:
问题出在 Key SerDe 上。根据您的屏幕截图,关键数据是非 JSON 字符串:
User_2
User_9
etc
所以不用
key.converter=org.apache.kafka.connect.json.JsonConverter
使用
key.converter=org.apache.kafka.connect.storage.StringConverter
编辑:
为您的连接器配置尝试此操作,明确指定转换器(如@OneCricketeer 所建议)
{
"name": "my-s3-sink3",
"config": {
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"tasks.max" : "1",
"topics" : "mysource.topic",
"s3.region" : "us-east-1",
"s3.bucket.name" : "topicbucket001",
"s3.part.size" : "5242880",
"flush.size" : "1",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility" : "NONE"
}
}
所以我能够解决这个问题。明确指定转换器后,我能够解决反序列化错误,然后遇到 S3 分段上传问题,该问题通过将 S3 IAM 策略附加到 ECS 任务定义来授予 Fargate 任务对 S3 存储桶的权限来解决。 感谢 Robin Moffatt 提供上述解决方案!