Clickhouse 不通过复杂的物化视图使用 Kafka 消息

Clickhouse not consuming Kafka messages via complex Materialized View

TLDR 摘要:Clickhouse Kafka 引擎,实体化视图无法处理复杂的 select 语句。

更长的版本:

我正在尝试使用 JSONEachRow 通过其 Kafka 引擎向 Clickhouse 发送大量 JSON 数据点。但是物化视图不会正确使用流。 我有一个用go编写的kafka生产者,它从多个tcp流中获取数据并异步写入kafka队列。

数据流向:

TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View -> Destination Table

所有这一切都有效,到目前为止一切顺利。

当我提高输入数据的速度(400,000 points/sec)时,我首先遇到了瓶颈,我的生产者无法足够快地写入 kafka,连接堆积起来。所以我希望尝试对数据进行批处理,但 Clickhouse 似乎无法将 json 的数组作为输入 (https://clickhouse.yandex/docs/en/interfaces/formats/)

所以我想到了在源头对数据点进行批处理并在物化视图中转换消息的想法,所以之前我有很多单独的消息:

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }

我现在有一条消息是上面的倍数并被字符串化,两点之间有换行符。

{"realtimes":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}

这里的目的是在实体化视图的select语句中使用visitParamExtract将字符串解析并转换为多个值。

物化视图:

CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT 
    visitParamExtractInt(x, 't') AS timestamp, 
    visitParamExtractString(x, 'i') AS device_id, 
    visitParamExtractInt(x, 'v') AS value FROM  (
    SELECT arrayJoin(*) AS x
    FROM 
    (
        SELECT splitByChar('\n', realtimes)
        FROM kafka_stream_realtimes 
    )  )

它似乎在做某事,因为当它处于 运行 时,kafka_stream_realtimes 被清除,我无法手动查询它得到一个错误 "DB::Exception: Failed to claim consumer: ." 但数据从未命中最终 table.

总结:

完整性: kafka_stream_realimes:

CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
  ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');

ltdb:

CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;

but it seems Clickhouse cannot take an array of json as input

似乎动机是在生产者端进行批量提交。为什么不将一堆 JSON 行分组并一次性提交? ClickHouse 将接收这些多行消息并为您解析它们。您可能还需要向 Kafka 引擎提供 kafka_row_delimiter 设置,因为大多数 Kafka 生产者不会在每条消息的末尾附加行分隔符。

所以一条消息变成了

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...