org.apache.kafka.common.errors.SerializationException: 反序列化密钥失败

org.apache.kafka.common.errors.SerializationException: Failed to deserialize key

我正在尝试使用 KAFKA TOPIC 在 KSQL 中创建 TABLE,table 已成功创建。当我尝试 select 来自 table 的数据时,出现错误

org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

主题中的数据:

Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/11/15 14:03:55.013 Z, key: {"MESG_SEQ_NO":1015}, value: {"MESG_SEQ_NO":1015,"MESSAGENO":"1015","MESSAGECREATIONDATETIME":1554336000000,"OPCO_GLN":"L","OPCO_COUNTRYCODE":"L","STORENO":5809,"SRVPNO":320,"ENDDATETIMETRANSACTION":1554336000000,"DATETIMESENTSTORE":1554336000000,"MESG_ACTION":"I","MESG_IND_COMPLETED":"N","MESG_IND_SENTTOBROKER":"N","MESG_ARTCLE_SALES_SEQ_NO":1015,"NASANUMBER":584623,"LKARNUMBER":null,"AMOUNTCE":1,"AMOUNTWEIGHT":null,"VOLUME":null,"AMOUNTCEDISCOUNT":null,"AMOUNTCEPRICED":null,"AMOUNTWEIGHTDISCOUNT":null,"AMOUNTWEIGHTPRICED":null,"SALEVALUE":null,"DISCOUNTVALUE":null,"PROMOTIONVALUE":null}, partition: 0

Table结构

ksql> CREATE TABLE MSG_2639_SALES_TRNS_STORES_TABLE_S( MESG_SEQ_NO INT(4,0) PRIMARY KEY, MESSAGENO STRING, MESSAGECREATIONDATETIME TIMESTAMP,
>OPCO_GLN STRING, OPCO_COUNTRYCODE STRING, STORENO INT(4,0), SRVPNO INT(4,0),
>ENDDATETIMETRANSACTION TIMESTAMP, DATETIMESENTSTORE TIMESTAMP, MESG_ACTION STRING, MESG_IND_COMPLETED STRING, MESG_IND_SENTTOBROKER STRING)
>with (kafka_topic='ah_topics_2639_sales' , key_format='JSON',  value_format='JSON');
 Message
---------------
 Table created
---------------


Selecting Data :

ksql> select * from MSG_2639_SALES_TRNS_STORES_TABLE_S emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|MESG_SEQ_NO |MESSAGENO   |MESSAGECREAT|OPCO_GLN    |OPCO_COUNTRY|STORENO     |SRVPNO      |ENDDATETIMET|DATETIMESENT|MESG_ACTION |MESG_IND_COM|MESG_IND_SEN|
|            |            |IONDATETIME |            |CODE        |            |            |RANSACTION  |STORE       |            |PLETED      |TTOBROKER   |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+

完成获取任何数据,我在日志中看到错误为 org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

您有 MESG_SEQ_NO INT(4,0) PRIMARY KEY,但是 key: {"MESG_SEQ_NO":1015} 是结构,而不是整数(不会自动提取和匹配 JSON 字段的名称)

因此,正如错误所述Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

我最后检查了一下,理想情况下,ksql 中的键应该是原始类型而不是结构化对象。如果它是结构化的,则需要明确说明 - https://docs.ksqldb.io/en/latest/reference/serialization/#deserialization-of-single-keys