Clickhouse 错误 DB::Exception: Pos in empty own buffer, Stack trace(复制此消息时,始终包含以下行)

Clickhouse Error DB::Exception: Pos in empty own buffer, Stack trace (when copying this message, always include the lines below)

谁能帮我解决这个错误?我不明白这意味着什么以及如何解决它。 Clickhouse 版本 21.7.2.7

我在 Kafka 中有几个分区。 当我创建 Kafka 引擎 table 并为其附加物化视图时,我遇到了此类错误。 但有趣的行为是 Kafka 引擎继续从某些分区摄取数据并跳过另一个分区。

<Error> void DB::StorageKafka::threadFunc(size_t): Code: 49, e.displayText() = DB::Exception: Pos in empty own buffer, Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x8d35c1a in /usr/bin/clickhouse
1. DB::PeekableReadBuffer::checkStateCorrect() const @ 0x106bc556 in /usr/bin/clickhouse
2. DB::JSONAsStringRowInputFormat::resetParser() @ 0x1078adf3 in /usr/bin/clickhouse
3. DB::KafkaBlockInputStream::readImpl() @ 0x1026fd2d in /usr/bin/clickhouse
4. DB::IBlockInputStream::read() @ 0xf51faa4 in /usr/bin/clickhouse
5. DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<void (DB::Block const&)> const&, std::__1::atomic<bool>*) @ 0xf542cf5 in /usr/bin/clickhouse
6. DB::StorageKafka::streamToViews() @ 0x10264279 in /usr/bin/clickhouse
7. DB::StorageKafka::threadFunc(unsigned long) @ 0x10262e58 in /usr/bin/clickhouse
8. DB::BackgroundSchedulePoolTaskInfo::execute() @ 0xf772a40 in /usr/bin/clickhouse
9. DB::BackgroundSchedulePool::threadFunction() @ 0xf774ab7 in /usr/bin/clickhouse
10. ? @ 0xf775834 in /usr/bin/clickhouse
11. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x8d76adf in /usr/bin/clickhouse
12. ? @ 0x8d7a3c3 in /usr/bin/clickhouse
13. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
14. __clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so
 (version 21.7.2.7 (official build))

卡夫卡引擎

CREATE TABLE IF NOT EXISTS db.kafka_table
(
    `message` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'host:29092,host2:29092,host3:29092',
    kafka_topic_list = 'topic',
    kafka_group_name = 'group',
    kafka_format = 'JSONAsString',
    kafka_num_consumers = 3,
    kafka_max_block_size = 10000,
    kafka_poll_max_batch_size = 10000,
    kafka_thread_per_consumer = 1,
    kafka_handle_error_mode = 'stream';

在 kafka_format = 'JSONAsString' 中尝试 LineAsString 格式 (详见https://clickhouse.com/docs/en/single/#lineasstring) 可能,您的输入格式无法解析为 JSON(输入包含 \r 或 \n 或类似内容),请尝试使用更相似的输入格式,如字符串或 blob。

我对放置在 Kafka 中的数据有疑问。看起来像

"{\r\n\t\"field_1\": \"value_1\", ..., \r\n\t\"field_n\": \"value_n\"\r\n}"

因此,Clickhouse Kafka Engine 无法解析它并提交偏移量,因此这是导致消费者延迟较大的原因。更改输入格式已解决问题,但我必须在附加到 Kafka 引擎 table.

的物化视图中添加补充 JSON 验证

正确SQL

CREATE TABLE IF NOT EXISTS db.kafka_table
(
    `message` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'host:29092,host2:29092,host3:29092',
    kafka_topic_list = 'topic',
    kafka_group_name = 'group',
    kafka_format = 'RawBLOB',
    kafka_num_consumers = 3,
    kafka_max_block_size = 10000,
    kafka_poll_max_batch_size = 10000,
    kafka_thread_per_consumer = 1,
    kafka_handle_error_mode = 'stream';