Clickhouse 不通过复杂的物化视图使用 Kafka 消息
Clickhouse not consuming Kafka messages via complex Materialized View
TLDR 摘要:Clickhouse Kafka 引擎,实体化视图无法处理复杂的 select 语句。
更长的版本:
我正在尝试使用 JSONEachRow 通过其 Kafka 引擎向 Clickhouse 发送大量 JSON 数据点。但是物化视图不会正确使用流。
我有一个用go编写的kafka生产者,它从多个tcp流中获取数据并异步写入kafka队列。
数据流向:
TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View ->
Destination Table
所有这一切都有效,到目前为止一切顺利。
当我提高输入数据的速度(400,000 points/sec)时,我首先遇到了瓶颈,我的生产者无法足够快地写入 kafka,连接堆积起来。所以我希望尝试对数据进行批处理,但 Clickhouse 似乎无法将 json 的数组作为输入 (https://clickhouse.yandex/docs/en/interfaces/formats/)
所以我想到了在源头对数据点进行批处理并在物化视图中转换消息的想法,所以之前我有很多单独的消息:
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }
我现在有一条消息是上面的倍数并被字符串化,两点之间有换行符。
{"realtimes":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}
这里的目的是在实体化视图的select语句中使用visitParamExtract将字符串解析并转换为多个值。
物化视图:
CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT
visitParamExtractInt(x, 't') AS timestamp,
visitParamExtractString(x, 'i') AS device_id,
visitParamExtractInt(x, 'v') AS value FROM (
SELECT arrayJoin(*) AS x
FROM
(
SELECT splitByChar('\n', realtimes)
FROM kafka_stream_realtimes
) )
它似乎在做某事,因为当它处于 运行 时,kafka_stream_realtimes 被清除,我无法手动查询它得到一个错误 "DB::Exception: Failed to claim consumer: ." 但数据从未命中最终 table.
总结:
- 数据到达 clickhouse,它只是消失了,似乎永远不会消失
到达决赛 table.
- 如果我删除物化视图,我可以看到数据在
kafka_stream_realtimes
- 如果我运行实体化视图查询作为INSERT INTO语句
接着是 select,它会从流中获取数据到
最终 table.
- 我意识到我可能只是将瓶颈推到了 clickhouse
这可能永远行不通,但我想通过它
完整性
完整性:
kafka_stream_realimes:
CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');
ltdb:
CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;
but it seems Clickhouse cannot take an array of json as input
似乎动机是在生产者端进行批量提交。为什么不将一堆 JSON 行分组并一次性提交? ClickHouse 将接收这些多行消息并为您解析它们。您可能还需要向 Kafka 引擎提供 kafka_row_delimiter
设置,因为大多数 Kafka 生产者不会在每条消息的末尾附加行分隔符。
所以一条消息变成了
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...
TLDR 摘要:Clickhouse Kafka 引擎,实体化视图无法处理复杂的 select 语句。
更长的版本:
我正在尝试使用 JSONEachRow 通过其 Kafka 引擎向 Clickhouse 发送大量 JSON 数据点。但是物化视图不会正确使用流。 我有一个用go编写的kafka生产者,它从多个tcp流中获取数据并异步写入kafka队列。
数据流向:
TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View -> Destination Table
所有这一切都有效,到目前为止一切顺利。
当我提高输入数据的速度(400,000 points/sec)时,我首先遇到了瓶颈,我的生产者无法足够快地写入 kafka,连接堆积起来。所以我希望尝试对数据进行批处理,但 Clickhouse 似乎无法将 json 的数组作为输入 (https://clickhouse.yandex/docs/en/interfaces/formats/)
所以我想到了在源头对数据点进行批处理并在物化视图中转换消息的想法,所以之前我有很多单独的消息:
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }
我现在有一条消息是上面的倍数并被字符串化,两点之间有换行符。
{"realtimes":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}
这里的目的是在实体化视图的select语句中使用visitParamExtract将字符串解析并转换为多个值。
物化视图:
CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT
visitParamExtractInt(x, 't') AS timestamp,
visitParamExtractString(x, 'i') AS device_id,
visitParamExtractInt(x, 'v') AS value FROM (
SELECT arrayJoin(*) AS x
FROM
(
SELECT splitByChar('\n', realtimes)
FROM kafka_stream_realtimes
) )
它似乎在做某事,因为当它处于 运行 时,kafka_stream_realtimes 被清除,我无法手动查询它得到一个错误 "DB::Exception: Failed to claim consumer: ." 但数据从未命中最终 table.
总结:
- 数据到达 clickhouse,它只是消失了,似乎永远不会消失 到达决赛 table.
- 如果我删除物化视图,我可以看到数据在 kafka_stream_realtimes
- 如果我运行实体化视图查询作为INSERT INTO语句 接着是 select,它会从流中获取数据到 最终 table.
- 我意识到我可能只是将瓶颈推到了 clickhouse 这可能永远行不通,但我想通过它 完整性
完整性: kafka_stream_realimes:
CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');
ltdb:
CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;
but it seems Clickhouse cannot take an array of json as input
似乎动机是在生产者端进行批量提交。为什么不将一堆 JSON 行分组并一次性提交? ClickHouse 将接收这些多行消息并为您解析它们。您可能还需要向 Kafka 引擎提供 kafka_row_delimiter
设置,因为大多数 Kafka 生产者不会在每条消息的末尾附加行分隔符。
所以一条消息变成了
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...