confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError
我是 python 的新人,正在尝试使用 'confluent_kafka' 生成 avro 消息。
同样使用 'confluent_kafka.schema_registry.avro.AvroSerializer'
(参考:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py)
它适用于带有 dict(json 转换为 dict)输入的简单 avro 模式,但对于下面的示例模式,我收到错误:
架构:
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.sqlserver",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}],
"connect.name": "io.debezium.connector.sqlserver.Source"
}
}, {
"name": "op",
"type": "string"
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
输入Json:
{
"after": null,
"before": {
"CoreOLTPEvents.dbo.Event.Value" : {
"EventId": 1111111111,
"CameraId": 222222222
}
},
"source": {
"version": "InitialLoad",
"connector": "sqlserver"
},
"op": "C"
}
错误:
ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before
'before' 字段类型是union (['null',record]),如果我把它改成只有record(去掉union)就没问题了。
但我需要调整我的输入,使其适用于给定模式。
(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它给出了字典输出)
如有任何帮助,我们将不胜感激。
更新:
实际大架构:
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}, {
"name": "SiteId",
"type": ["null", "long"],
"default": null
}, {
"name": "VehicleId",
"type": ["null", "long"],
"default": null
}, {
"name": "EventReviewStatusID",
"type": "int"
}, {
"name": "EventTypeId",
"type": ["null", "int"],
"default": null
}, {
"name": "EventDateTime",
"type": ["null", {
"type": "string",
"connect.name": "net.smartdrive.converters.SmartdriveEventDateFieldConverter"
}],
"default": null
}, {
"name": "FTPUploadDateTime",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "CAMFileName",
"type": "string"
}, {
"name": "KeypadEntryCode",
"type": ["null", "string"],
"default": null
}, {
"name": "IsActive",
"type": {
"type": "boolean",
"connect.default": true
},
"default": true
}, {
"name": "Flagged",
"type": "boolean"
}, {
"name": "EventTitle",
"type": ["null", "string"],
"default": null
}, {
"name": "CreatedBy",
"type": "long"
}, {
"name": "CreatedDate",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "ModifiedBy",
"type": "long"
}, {
"name": "ModifiedDate",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "ReReviewAnalysis",
"type": ["null", "string"],
"default": null
}, {
"name": "LegacyEventId",
"type": ["null", "long"],
"default": null
}, {
"name": "TripId",
"type": ["null", "long"],
"default": null
}, {
"name": "FileVersion",
"type": ["null", "string"],
"default": null
}, {
"name": "EventNumber",
"type": ["null", "string"],
"default": null
}, {
"name": "Latitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "Longitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "GeoAddressId",
"type": ["null", "long"],
"default": null
}, {
"name": "ReviewedEventId",
"type": ["null", "long"],
"default": null
}, {
"name": "VideoStatus",
"type": {
"type": "int",
"connect.default": 0
},
"default": 0
}, {
"name": "PredictionImportance",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 15,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "15"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "FlaggedBy",
"type": ["null", "long"],
"default": null
}, {
"name": "FlaggedDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "TriggerTypeId",
"type": ["null", "int"],
"default": null
}, {
"name": "VideoDeleteDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "MetadataDeleteDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "RetentionStatus",
"type": {
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"default": 0
}, {
"name": "PartnerTriggerId",
"type": ["null", "int"],
"default": null
}, {
"name": "CoachingStateId",
"type": {
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"default": 0
}, {
"name": "EventKudoHistoryId",
"type": ["null", "int"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.sqlserver",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}, {
"name": "name",
"type": "string"
}, {
"name": "ts_ms",
"type": "long"
}, {
"name": "snapshot",
"type": [{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
}, "null"],
"default": "false"
}, {
"name": "db",
"type": "string"
}, {
"name": "schema",
"type": "string"
}, {
"name": "table",
"type": "string"
}, {
"name": "change_lsn",
"type": ["null", "string"],
"default": null
}, {
"name": "commit_lsn",
"type": ["null", "string"],
"default": null
}, {
"name": "event_serial_no",
"type": ["null", "long"],
"default": null
}],
"connect.name": "io.debezium.connector.sqlserver.Source"
}
}, {
"name": "op",
"type": "string"
}, {
"name": "ts_ms",
"type": ["null", "long"],
"default": null
}, {
"name": "transaction",
"type": ["null", {
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [{
"name": "id",
"type": "string"
}, {
"name": "total_order",
"type": "long"
}, {
"name": "data_collection_order",
"type": "long"
}]
}],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
大型模式的输入:
{
"before": null,
"after": {
"EventId": 1234566,
"CameraId": 2233,
"SiteId": 111,
"VehicleId": 45587,
"EventReviewStatusID": 10,
"EventTypeId": 123,
"EventDateTime": "2015-01-02T01:30:29Z",
"FTPUploadDateTime": 1420193330590,
"CAMFileName": "XYZ",
"KeypadEntryCode": "0",
"IsActive": false,
"Flagged": false,
"EventTitle": null,
"CreatedBy": 1,
"CreatedDate": 1420191120730,
"ModifiedBy": 1,
"ModifiedDate": 1577871185680,
"ReReviewAnalysis": null,
"LegacyEventId": null,
"TripId": 3382,
"FileVersion": "2.2",
"EventNumber": "AAAA-BBBB",
"Latitude": "UU9elrA=",
"Longitude": "/ueZUeFw",
"GeoAddressId": null,
"ReviewedEventId": 129411077,
"VideoStatus": 4,
"PredictionImportance": 0.1402457539,
"FlaggedBy": null,
"FlaggedDate": null,
"TriggerTypeId": 322,
"VideoDeleteDate": 1422783120000,
"MetadataDeleteDate": 1577871120000,
"RetentionStatus": 15,
"PartnerTriggerId": null,
"CoachingStateId": 0,
"EventKudoHistoryId": null
},
"source": {
"version": "Final",
"connector": "sqlserver",
"name": "CoreOLTP",
"ts_ms": 1615813992548,
"snapshot": "false",
"db": "CoreOLTP",
"schema": "dbo",
"table": "xyz",
"change_lsn": null,
"commit_lsn": null,
"event_serial_no": null
},
"op": "C",
"ts_ms": 1615813992548,
"transaction": null
}
错误:
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="{'EventId': 129411077, 'CameraId': 46237, 'SiteId': 2148, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 247, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420191120590, 'CAMFileName': 'JD2BC02120150102013029ER.SDE', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'WSHX-8QQ2', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None} (type <class 'dict'>) do not match ['null', 'CoreOLTPEvents.dbo.Event.Value'] on field after"}
您只需更改输入,使 before
字段不包含名称空间。所以它需要看起来像这样:
{
"after": null,
"before": {
"EventId": 1111111111,
"CameraId": 222222222
},
"source": {
"version": "InitialLoad",
"connector": "sqlserver"
},
"op": "C"
}
您的原始输入看起来像是 JSON 编码的 avro,因为字段 before
具有 CoreOLTPEvents.dbo.Event.Value
命名空间。但是,我猜它一定是手工制作的,因为 CameraId
应该指定为 {"long": 222222222}
而不仅仅是 222222222
.
如果你确实有 Avro 编码 JSON(从一些其他过程或其他东西的结果)那么你可以使用类似 fastavro.json_reader
的东西来读入那个文件,它会创建正确的内存表示(不包括联合字段的类型信息)。
更新:
为了找出完整模式和完整数据的问题所在,我首先使用 json.load
加载了两个对象,然后使用 fastavro.validate(record, schema)
输出是一个堆栈跟踪,以这个:
fastavro._validate_common.ValidationError: [
"CoreOLTPEvents.dbo.Event.Envelope.after is <{'EventId': 1234566, 'CameraId': 2233, 'SiteId': 111, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 123, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420193330590, 'CAMFileName': 'XYZ', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'AAAA-BBBB', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None}> of type <class 'dict'> expected null",
"CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected null",
"CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected {'scale': 10, 'precision': 13, 'connect.version': 1, 'connect.parameters': {'scale': '10', 'connect.decimal.precision': '13'}, 'connect.name': 'org.apache.kafka.connect.data.Decimal', 'logicalType': 'decimal', 'type': 'bytes'}"
]
这试图告诉我们存在 3 个潜在问题。第一个是 after
中的值不匹配 null
,但我们可以忽略它,因为我们不希望 after
匹配 null
.
后两个问题才是真正的问题。它说 Latitude
的值是字符串 UU9elrA=
,但它不匹配 null
或 bytes
。这里的字符串看起来是 base64 编码的,所以也许您有一些代码可以将其解码为字节,如果是这样,那么实际问题可能是其他问题,但如果是这样,那么我认为您应该能够使用 fastavro.validate
来弄清楚问题是什么。
我是 python 的新人,正在尝试使用 'confluent_kafka' 生成 avro 消息。 同样使用 'confluent_kafka.schema_registry.avro.AvroSerializer' (参考:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py)
它适用于带有 dict(json 转换为 dict)输入的简单 avro 模式,但对于下面的示例模式,我收到错误:
架构:
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.sqlserver",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}],
"connect.name": "io.debezium.connector.sqlserver.Source"
}
}, {
"name": "op",
"type": "string"
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
输入Json:
{
"after": null,
"before": {
"CoreOLTPEvents.dbo.Event.Value" : {
"EventId": 1111111111,
"CameraId": 222222222
}
},
"source": {
"version": "InitialLoad",
"connector": "sqlserver"
},
"op": "C"
}
错误:
ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before
'before' 字段类型是union (['null',record]),如果我把它改成只有record(去掉union)就没问题了。 但我需要调整我的输入,使其适用于给定模式。
(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它给出了字典输出)
如有任何帮助,我们将不胜感激。
更新: 实际大架构:
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}, {
"name": "SiteId",
"type": ["null", "long"],
"default": null
}, {
"name": "VehicleId",
"type": ["null", "long"],
"default": null
}, {
"name": "EventReviewStatusID",
"type": "int"
}, {
"name": "EventTypeId",
"type": ["null", "int"],
"default": null
}, {
"name": "EventDateTime",
"type": ["null", {
"type": "string",
"connect.name": "net.smartdrive.converters.SmartdriveEventDateFieldConverter"
}],
"default": null
}, {
"name": "FTPUploadDateTime",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "CAMFileName",
"type": "string"
}, {
"name": "KeypadEntryCode",
"type": ["null", "string"],
"default": null
}, {
"name": "IsActive",
"type": {
"type": "boolean",
"connect.default": true
},
"default": true
}, {
"name": "Flagged",
"type": "boolean"
}, {
"name": "EventTitle",
"type": ["null", "string"],
"default": null
}, {
"name": "CreatedBy",
"type": "long"
}, {
"name": "CreatedDate",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "ModifiedBy",
"type": "long"
}, {
"name": "ModifiedDate",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "ReReviewAnalysis",
"type": ["null", "string"],
"default": null
}, {
"name": "LegacyEventId",
"type": ["null", "long"],
"default": null
}, {
"name": "TripId",
"type": ["null", "long"],
"default": null
}, {
"name": "FileVersion",
"type": ["null", "string"],
"default": null
}, {
"name": "EventNumber",
"type": ["null", "string"],
"default": null
}, {
"name": "Latitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "Longitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "GeoAddressId",
"type": ["null", "long"],
"default": null
}, {
"name": "ReviewedEventId",
"type": ["null", "long"],
"default": null
}, {
"name": "VideoStatus",
"type": {
"type": "int",
"connect.default": 0
},
"default": 0
}, {
"name": "PredictionImportance",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 15,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "15"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "FlaggedBy",
"type": ["null", "long"],
"default": null
}, {
"name": "FlaggedDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "TriggerTypeId",
"type": ["null", "int"],
"default": null
}, {
"name": "VideoDeleteDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "MetadataDeleteDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "RetentionStatus",
"type": {
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"default": 0
}, {
"name": "PartnerTriggerId",
"type": ["null", "int"],
"default": null
}, {
"name": "CoachingStateId",
"type": {
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"default": 0
}, {
"name": "EventKudoHistoryId",
"type": ["null", "int"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.sqlserver",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}, {
"name": "name",
"type": "string"
}, {
"name": "ts_ms",
"type": "long"
}, {
"name": "snapshot",
"type": [{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
}, "null"],
"default": "false"
}, {
"name": "db",
"type": "string"
}, {
"name": "schema",
"type": "string"
}, {
"name": "table",
"type": "string"
}, {
"name": "change_lsn",
"type": ["null", "string"],
"default": null
}, {
"name": "commit_lsn",
"type": ["null", "string"],
"default": null
}, {
"name": "event_serial_no",
"type": ["null", "long"],
"default": null
}],
"connect.name": "io.debezium.connector.sqlserver.Source"
}
}, {
"name": "op",
"type": "string"
}, {
"name": "ts_ms",
"type": ["null", "long"],
"default": null
}, {
"name": "transaction",
"type": ["null", {
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [{
"name": "id",
"type": "string"
}, {
"name": "total_order",
"type": "long"
}, {
"name": "data_collection_order",
"type": "long"
}]
}],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
大型模式的输入:
{
"before": null,
"after": {
"EventId": 1234566,
"CameraId": 2233,
"SiteId": 111,
"VehicleId": 45587,
"EventReviewStatusID": 10,
"EventTypeId": 123,
"EventDateTime": "2015-01-02T01:30:29Z",
"FTPUploadDateTime": 1420193330590,
"CAMFileName": "XYZ",
"KeypadEntryCode": "0",
"IsActive": false,
"Flagged": false,
"EventTitle": null,
"CreatedBy": 1,
"CreatedDate": 1420191120730,
"ModifiedBy": 1,
"ModifiedDate": 1577871185680,
"ReReviewAnalysis": null,
"LegacyEventId": null,
"TripId": 3382,
"FileVersion": "2.2",
"EventNumber": "AAAA-BBBB",
"Latitude": "UU9elrA=",
"Longitude": "/ueZUeFw",
"GeoAddressId": null,
"ReviewedEventId": 129411077,
"VideoStatus": 4,
"PredictionImportance": 0.1402457539,
"FlaggedBy": null,
"FlaggedDate": null,
"TriggerTypeId": 322,
"VideoDeleteDate": 1422783120000,
"MetadataDeleteDate": 1577871120000,
"RetentionStatus": 15,
"PartnerTriggerId": null,
"CoachingStateId": 0,
"EventKudoHistoryId": null
},
"source": {
"version": "Final",
"connector": "sqlserver",
"name": "CoreOLTP",
"ts_ms": 1615813992548,
"snapshot": "false",
"db": "CoreOLTP",
"schema": "dbo",
"table": "xyz",
"change_lsn": null,
"commit_lsn": null,
"event_serial_no": null
},
"op": "C",
"ts_ms": 1615813992548,
"transaction": null
}
错误:
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="{'EventId': 129411077, 'CameraId': 46237, 'SiteId': 2148, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 247, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420191120590, 'CAMFileName': 'JD2BC02120150102013029ER.SDE', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'WSHX-8QQ2', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None} (type <class 'dict'>) do not match ['null', 'CoreOLTPEvents.dbo.Event.Value'] on field after"}
您只需更改输入,使 before
字段不包含名称空间。所以它需要看起来像这样:
{
"after": null,
"before": {
"EventId": 1111111111,
"CameraId": 222222222
},
"source": {
"version": "InitialLoad",
"connector": "sqlserver"
},
"op": "C"
}
您的原始输入看起来像是 JSON 编码的 avro,因为字段 before
具有 CoreOLTPEvents.dbo.Event.Value
命名空间。但是,我猜它一定是手工制作的,因为 CameraId
应该指定为 {"long": 222222222}
而不仅仅是 222222222
.
如果你确实有 Avro 编码 JSON(从一些其他过程或其他东西的结果)那么你可以使用类似 fastavro.json_reader
的东西来读入那个文件,它会创建正确的内存表示(不包括联合字段的类型信息)。
更新:
为了找出完整模式和完整数据的问题所在,我首先使用 json.load
加载了两个对象,然后使用 fastavro.validate(record, schema)
输出是一个堆栈跟踪,以这个:
fastavro._validate_common.ValidationError: [
"CoreOLTPEvents.dbo.Event.Envelope.after is <{'EventId': 1234566, 'CameraId': 2233, 'SiteId': 111, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 123, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420193330590, 'CAMFileName': 'XYZ', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'AAAA-BBBB', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None}> of type <class 'dict'> expected null",
"CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected null",
"CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected {'scale': 10, 'precision': 13, 'connect.version': 1, 'connect.parameters': {'scale': '10', 'connect.decimal.precision': '13'}, 'connect.name': 'org.apache.kafka.connect.data.Decimal', 'logicalType': 'decimal', 'type': 'bytes'}"
]
这试图告诉我们存在 3 个潜在问题。第一个是 after
中的值不匹配 null
,但我们可以忽略它,因为我们不希望 after
匹配 null
.
后两个问题才是真正的问题。它说 Latitude
的值是字符串 UU9elrA=
,但它不匹配 null
或 bytes
。这里的字符串看起来是 base64 编码的,所以也许您有一些代码可以将其解码为字节,如果是这样,那么实际问题可能是其他问题,但如果是这样,那么我认为您应该能够使用 fastavro.validate
来弄清楚问题是什么。