卡夫卡连接 |无法反序列化主题数据 |检索 id 的 Avro 键/值架构版本时出错 |找不到主题错误代码:40401
Kafka connect | Failed to deserialize data for topic | Error retrieving Avro key / value schema version for id | Subject not found error code: 40401
首先感谢@OneCricketeer 迄今为止的支持。到目前为止,我已经尝试了很多配置,我不知道还能尝试什么。
使用 confluent connect-standalone worker.properties sink.properties
访问外部流。
连接正常,我可以看到已加载偏移量:
INFO [my_mysql_sink|task-0] [Consumer clientId=connector-consumer-my_mysql_sink-0, groupId=connect-my_mysql_sink] Setting offset for partition gamerboot.gamer.master.workouts.clubs.spieleranalyse-1 to the committed offset FetchPosition{
offset=2225 , offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka8.pro.someurl.net:9093 (id: 8 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)
但之后当新消息进来时我收到错误消息:
ERROR [my_mysql_sink|task-0] WorkerSinkTask{id=my_mysql_sink-0} Error converting message key in topic 'gamerboot.gamer.master.workouts.clubs.spieleranalyse' partition 1 at offset 2225 and timestamp 1641459346507: Failed to deserialize data for topic gamerboot.gamer.master.workouts.clubs.spieleranalyse to Avro:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 422
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
我不明白。
worker.properties:
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
sink.properties
#key.converter.enhanced.avro.schema.support=true
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://schema-reg.pro.someurl.net
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://schema-reg.pro.someurl.net
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#pk.mode=record_key
#pk.fields=
由于mysql中没有设置pk,我想从流中记录所有内容。
如其所说“检索 ID 422 的 Avro 密钥模式版本时出错 ”,我可以看到以下内容:
screenshot_subject_id
不要像它说的那样奇怪 JSON,这只是我的 ChromePlugin 将其解释为 json。
发现同样的价值。我还尝试了 sink.properties 中的所有组合,并在此处进行了注释。
我还能够 curl 键和值的最新模式(如):
{
"type": "record",
"name": "ClubWorkoutKey",
"namespace": "com.ad.gamerboot.kafka.models.workouts",
"fields": [
{
"name": "playerId",
"type": "string"
},
{
"name": "tagId",
"type": [
"null",
"string"
],
"default": null
}
]
}
当我在 sink.properties 中输入 key.converter 和 value.converter 的字符串转换器时,情况更进一步了。但是在我看来一定有什么不对的地方,因为Avro就是在这里传递的。使用 String 然后还有其他问题,我将不得不设置一个 pk 并打开删除等
感谢支持。
*编辑:
所以,给我的是:
topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse
schema.url = https://schema-reg.pro.someurl.net
以及:架构 ID url:
https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.workouts-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/schema
和:
https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest
对我来说这就像一个谜题,我 20 天前开始使用 kafka。从那里我尝试了 urls 并找到了我为主题发布的那些:
对于密钥:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest/
架构:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey","version":1,"id":422,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKey\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}
对于值:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/
和https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue/versions/latest/
模式:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue","version":1,"id":423,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKickValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ballSpeed\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ballSpeedFloat\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"ballSpeedZone\",\"type\":{\"type\":\"enum\",\"name\":\"BallSpeedZone\",\"symbols\":[\"COLD\",\"MEDIUM\",\"HOT\",\"FIRE\",\"INVALID\"]}},{\"name\":\"confidence\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}
和:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue","version":1,"id":424,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutPlayerMotionValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"absoluteDistance\",\"type\":\"float\"},{\"name\":\"averageSpeed\",\"type\":\"float\"},{\"name\":\"peakSpeed\",\"type\":\"float\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"installationId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"averageSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"AverageSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null,\"aliases\":[\"speedZone\"]},{\"name\":\"peakSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"PeakSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}
MySQL table:
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| playerid | varchar(100) | YES | | NULL | |
| timestamp | mediumtext | YES | | NULL | |
| absoluteDistance | float | YES | | NULL | |
| avarageSpeed | float | YES | | NULL | |
| peakSpeed | float | YES | | NULL | |
| tagId | varchar(50) | YES | | NULL | |
| installationId | varchar(100) | YES | | NULL | |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES | | NULL | |
| peakSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES | | NULL | |
| ballSpeed | int(11) | YES | | NULL | |
| ballSpeedFloat | float | YES | | NULL | |
| ballSpeedZone | enum('COLD','MEDIUM','HOT','FIRE','INVALID') | YES | | NULL | |
| confidence | int(11) | YES | | NULL | |
| ingestionTime | mediumtext | YES | | NULL | |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
MySQL 中的预期数据:
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| playerid | timestamp | absoluteDistance | avarageSpeed | peakSpeed | tagId | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 | 5.76953 | 1.1543 | 1.22363 | 0104FLHBN009XD | null | WALK | WALK | NULL | NULL | NULL | NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 | NULL | NULL | NULL | 0104FLHBN009XD | NULL | NULL | NULL | 37 | 37.0897 | COLD | 77 | 1641484896747 |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
来自 avro-console 的数据看起来像数据库条目:
{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641484677624,"tagId":{"string":"0104FLHBN009XD"},"ballSpeed":{"int":37},"ballSpeedFloat":{"float":37.08966},"ballSpeedZone":"COLD","confidence":{"int":77},"ingestionTime":{"long":1641484896747}}
{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641495873505,"absoluteDistance":5.7695312,"averageSpeed":1.1542969,"peakSpeed":1.2236328,"tagId":{"string":"0104FLHBN009XD"},"installationId":null,"averageSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.AverageSpeedZone":"WALK"},"peakSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.PeakSpeedZone":"WALK"},"ingestionTime":{"long":1641496586458}}
这是一个全新的实际融合安装。几个小时前我将 Avro 更新为:kafka-connect-avro-converter:7.0.1
公司更改了关于 RecordNameStrategy 的架构。现在一切正常。
谢谢
首先感谢@OneCricketeer 迄今为止的支持。到目前为止,我已经尝试了很多配置,我不知道还能尝试什么。
使用 confluent connect-standalone worker.properties sink.properties
访问外部流。
连接正常,我可以看到已加载偏移量:
INFO [my_mysql_sink|task-0] [Consumer clientId=connector-consumer-my_mysql_sink-0, groupId=connect-my_mysql_sink] Setting offset for partition gamerboot.gamer.master.workouts.clubs.spieleranalyse-1 to the committed offset FetchPosition{ offset=2225 , offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka8.pro.someurl.net:9093 (id: 8 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)
但之后当新消息进来时我收到错误消息:
ERROR [my_mysql_sink|task-0] WorkerSinkTask{id=my_mysql_sink-0} Error converting message key in topic 'gamerboot.gamer.master.workouts.clubs.spieleranalyse' partition 1 at offset 2225 and timestamp 1641459346507: Failed to deserialize data for topic gamerboot.gamer.master.workouts.clubs.spieleranalyse to Avro:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 422
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
我不明白。
worker.properties:
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
sink.properties
#key.converter.enhanced.avro.schema.support=true
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://schema-reg.pro.someurl.net
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://schema-reg.pro.someurl.net
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#pk.mode=record_key
#pk.fields=
由于mysql中没有设置pk,我想从流中记录所有内容。
如其所说“检索 ID 422 的 Avro 密钥模式版本时出错 ”,我可以看到以下内容:
screenshot_subject_id
不要像它说的那样奇怪 JSON,这只是我的 ChromePlugin 将其解释为 json。 发现同样的价值。我还尝试了 sink.properties 中的所有组合,并在此处进行了注释。 我还能够 curl 键和值的最新模式(如):
{
"type": "record",
"name": "ClubWorkoutKey",
"namespace": "com.ad.gamerboot.kafka.models.workouts",
"fields": [
{
"name": "playerId",
"type": "string"
},
{
"name": "tagId",
"type": [
"null",
"string"
],
"default": null
}
]
}
当我在 sink.properties 中输入 key.converter 和 value.converter 的字符串转换器时,情况更进一步了。但是在我看来一定有什么不对的地方,因为Avro就是在这里传递的。使用 String 然后还有其他问题,我将不得不设置一个 pk 并打开删除等
感谢支持。
*编辑:
所以,给我的是:
topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse
schema.url = https://schema-reg.pro.someurl.net
以及:架构 ID url:
https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.workouts-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/schema
和:
https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest
对我来说这就像一个谜题,我 20 天前开始使用 kafka。从那里我尝试了 urls 并找到了我为主题发布的那些:
对于密钥:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest/
架构:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey","version":1,"id":422,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKey\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}
对于值:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/
和https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue/versions/latest/
模式:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue","version":1,"id":423,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKickValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ballSpeed\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ballSpeedFloat\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"ballSpeedZone\",\"type\":{\"type\":\"enum\",\"name\":\"BallSpeedZone\",\"symbols\":[\"COLD\",\"MEDIUM\",\"HOT\",\"FIRE\",\"INVALID\"]}},{\"name\":\"confidence\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}
和:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue","version":1,"id":424,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutPlayerMotionValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"absoluteDistance\",\"type\":\"float\"},{\"name\":\"averageSpeed\",\"type\":\"float\"},{\"name\":\"peakSpeed\",\"type\":\"float\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"installationId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"averageSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"AverageSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null,\"aliases\":[\"speedZone\"]},{\"name\":\"peakSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"PeakSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}
MySQL table:
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| playerid | varchar(100) | YES | | NULL | |
| timestamp | mediumtext | YES | | NULL | |
| absoluteDistance | float | YES | | NULL | |
| avarageSpeed | float | YES | | NULL | |
| peakSpeed | float | YES | | NULL | |
| tagId | varchar(50) | YES | | NULL | |
| installationId | varchar(100) | YES | | NULL | |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES | | NULL | |
| peakSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES | | NULL | |
| ballSpeed | int(11) | YES | | NULL | |
| ballSpeedFloat | float | YES | | NULL | |
| ballSpeedZone | enum('COLD','MEDIUM','HOT','FIRE','INVALID') | YES | | NULL | |
| confidence | int(11) | YES | | NULL | |
| ingestionTime | mediumtext | YES | | NULL | |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
MySQL 中的预期数据:
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| playerid | timestamp | absoluteDistance | avarageSpeed | peakSpeed | tagId | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 | 5.76953 | 1.1543 | 1.22363 | 0104FLHBN009XD | null | WALK | WALK | NULL | NULL | NULL | NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 | NULL | NULL | NULL | 0104FLHBN009XD | NULL | NULL | NULL | 37 | 37.0897 | COLD | 77 | 1641484896747 |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
来自 avro-console 的数据看起来像数据库条目:
{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641484677624,"tagId":{"string":"0104FLHBN009XD"},"ballSpeed":{"int":37},"ballSpeedFloat":{"float":37.08966},"ballSpeedZone":"COLD","confidence":{"int":77},"ingestionTime":{"long":1641484896747}}
{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641495873505,"absoluteDistance":5.7695312,"averageSpeed":1.1542969,"peakSpeed":1.2236328,"tagId":{"string":"0104FLHBN009XD"},"installationId":null,"averageSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.AverageSpeedZone":"WALK"},"peakSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.PeakSpeedZone":"WALK"},"ingestionTime":{"long":1641496586458}}
这是一个全新的实际融合安装。几个小时前我将 Avro 更新为:kafka-connect-avro-converter:7.0.1
公司更改了关于 RecordNameStrategy 的架构。现在一切正常。
谢谢