Kafka:所有消息在流中失败,而主题中的数据
Kafka: All messages failing in stream while data in topics
我有主题 post_users_t
,在其上使用 PRINT
命令时我得到
rowtime: 4/2/20 2:03:48 PM UTC, key: <null>, value: {"userid": 6, "id": 8, "title": "testest", "body": "Testingmoreand more"}
rowtime: 4/2/20 2:03:48 PM UTC, key: <null>, value: {"userid": 7, "id": 11, "title": "testest", "body": "Testingmoreand more"}
然后我创建了一个流:
CREATE STREAM userstream (userid INT, id INT, title VARCHAR, body VARCHAR)
WITH (KAFKA_TOPIC='post_users_t',
VALUE_FORMAT='JSON');
但是我select无法从中得到任何东西,当我DESCRIBE EXTENDED
它时所有消息都失败了。
consumer-messages-per-sec: 1.06 consumer-total-bytes: 116643 consumer-total-messages: 3417 last-message: 2020-04-02T14:08:08.546Z
consumer-failed-messages: 3417 consumer-failed-messages-per-sec: 1.06 last-failed: 2020-04-02T14:08:08.56Z
我做错了什么?
更多信息在下!
从头打印主题:
ksql> print 'post_users_t' from beginning limit 2;
Key format: SESSION(AVRO) or HOPPING(AVRO) or TUMBLING(AVRO) or AVRO or SESSION(PROTOBUF) or HOPPING(PROTOBUF) or TUMBLING(PROTOBUF) or PROTOBUF or SESSION(JSON) or HOPPING(JSON) or TUMBLING(JSON) or JSON or SESSION(JSON_SR) or HOPPING(JSON_SR) or TUMBLING(JSON_SR) or JSON_SR or SESSION(KAFKA_INT) or HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or KAFKA_INT or SESSION(KAFKA_BIGINT) or HOPPING(KAFKA_BIGINT) or TUMBLING(KAFKA_BIGINT) or KAFKA_BIGINT or SESSION(KAFKA_DOUBLE) or HOPPING(KAFKA_DOUBLE) or TUMBLING(KAFKA_DOUBLE) or KAFKA_DOUBLE or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 4/2/20 1:04:08 PM UTC, key: <null>, value: {"userid": 1, "id": 1, "title": "loremit", "body": "loremit heiluu ja paukkuu"}
rowtime: 4/2/20 1:04:08 PM UTC, key: <null>, value: {"userid": 2, "id": 2, "title": "lorbe", "body": "larboloilllaaa"}
根据 ksqlDB 对该主题的检查输出,您的数据已在 Avro 中序列化:
Value format: AVRO or KAFKA_STRING
但您创建了 STREAM
指定 VALUE_FORMAT='JSON'
。这将导致反序列化错误,如果你 运行 docker-compose logs -f ksqldb-server
你会在尝试查询流时看到被写出。
由于您使用的是 Avro,因此无需指定架构。试试这个:
CREATE STREAM userstream
WITH (KAFKA_TOPIC='post_users_t',
VALUE_FORMAT='AVRO');
我有主题 post_users_t
,在其上使用 PRINT
命令时我得到
rowtime: 4/2/20 2:03:48 PM UTC, key: <null>, value: {"userid": 6, "id": 8, "title": "testest", "body": "Testingmoreand more"}
rowtime: 4/2/20 2:03:48 PM UTC, key: <null>, value: {"userid": 7, "id": 11, "title": "testest", "body": "Testingmoreand more"}
然后我创建了一个流:
CREATE STREAM userstream (userid INT, id INT, title VARCHAR, body VARCHAR)
WITH (KAFKA_TOPIC='post_users_t',
VALUE_FORMAT='JSON');
但是我select无法从中得到任何东西,当我DESCRIBE EXTENDED
它时所有消息都失败了。
consumer-messages-per-sec: 1.06 consumer-total-bytes: 116643 consumer-total-messages: 3417 last-message: 2020-04-02T14:08:08.546Z
consumer-failed-messages: 3417 consumer-failed-messages-per-sec: 1.06 last-failed: 2020-04-02T14:08:08.56Z
我做错了什么?
更多信息在下!
从头打印主题:
ksql> print 'post_users_t' from beginning limit 2;
Key format: SESSION(AVRO) or HOPPING(AVRO) or TUMBLING(AVRO) or AVRO or SESSION(PROTOBUF) or HOPPING(PROTOBUF) or TUMBLING(PROTOBUF) or PROTOBUF or SESSION(JSON) or HOPPING(JSON) or TUMBLING(JSON) or JSON or SESSION(JSON_SR) or HOPPING(JSON_SR) or TUMBLING(JSON_SR) or JSON_SR or SESSION(KAFKA_INT) or HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or KAFKA_INT or SESSION(KAFKA_BIGINT) or HOPPING(KAFKA_BIGINT) or TUMBLING(KAFKA_BIGINT) or KAFKA_BIGINT or SESSION(KAFKA_DOUBLE) or HOPPING(KAFKA_DOUBLE) or TUMBLING(KAFKA_DOUBLE) or KAFKA_DOUBLE or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 4/2/20 1:04:08 PM UTC, key: <null>, value: {"userid": 1, "id": 1, "title": "loremit", "body": "loremit heiluu ja paukkuu"}
rowtime: 4/2/20 1:04:08 PM UTC, key: <null>, value: {"userid": 2, "id": 2, "title": "lorbe", "body": "larboloilllaaa"}
根据 ksqlDB 对该主题的检查输出,您的数据已在 Avro 中序列化:
Value format: AVRO or KAFKA_STRING
但您创建了 STREAM
指定 VALUE_FORMAT='JSON'
。这将导致反序列化错误,如果你 运行 docker-compose logs -f ksqldb-server
你会在尝试查询流时看到被写出。
由于您使用的是 Avro,因此无需指定架构。试试这个:
CREATE STREAM userstream
WITH (KAFKA_TOPIC='post_users_t',
VALUE_FORMAT='AVRO');