集群上的 Clickhouse Kafka 引擎
Clickhouse Kafka engine on cluster
我正在 ClickHouse 集群上使用 Kafka 引擎。目前 ClickHouse 22.1 集群和 Kafka 在 Docker 中 运行。以下是配置:https://github.com/apanasevich/clickhouse-etl-cluster
这里是 Kafka 集成的 DDL:
CREATE DATABASE etl ON CLUSTER cluster;
CREATE TABLE etl.tmp ON CLUSTER cluster
(
`event_timestamp` DateTime64(3, 'Europe/Moscow'),
`account_id` Int32,
`session` String,
`type` Int16
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/etl/tmp', '{replica}')
PARTITION BY toMonday(event_timestamp)
ORDER BY (account_id, event_timestamp)
TTL toDateTime(event_timestamp) + toIntervalDay(90);
CREATE TABLE etl.tmp_kafka_datasource
(
`topic_data` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:29092', kafka_topic_list = 'Tmp',
kafka_group_name = 'clickhouse_etl_group', kafka_format = 'JSONAsString', kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW etl.tmp_consumer TO etl.tmp
(
`event_timestamp` DateTime64(3),
`account_id` Int32,
`session` String,
`type` Int16
) AS
SELECT
fromUnixTimestamp64Milli(JSONExtract(topic_data, 'time', 'Int64')) AS event_timestamp,
toInt32(JSON_VALUE(topic_data, '$.data.accountId')) AS account_id,
JSON_VALUE(topic_data, '$.data.session') AS session,
toInt16(JSON_VALUE(topic_data, '$.data.type')) AS type
FROM etl.tmp_kafka_datasource;
tabletmp
中没有数据的问题,尽管它被VIEW消费(Kafka中相应的消费者组没有滞后)。
同时可以通过INSERT INTO tmp
语句直接成功插入数据,并复制到第二台主机上。
已编辑:
来自主题的数据可以通过 select * from etl.tmp_kafka_datasource
查询直接成功使用。
在单个 ClickHouse 实例上的相同场景效果很好。
我已经从 docker 文件中删除了卷映射,现在唯一的错误是:
2022.03.11 07:39:54.519652 [ 1 ] {} <Warning> Application: Listen [::]:9005 failed: Poco::Exception. Code: 1000, e.code() = 0, DNS error: EAI: Address family for hostname not supported (version 21.11.4.14 (official build)). If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> . Example for disabled IPv4: <listen_host>::</listen_host>
2022.03.11 07:39:54.641049 [ 249 ] {} <Error> virtual bool DB::DDLWorker::initializeMainThread(): Code: 999. Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
(Connection loss). (KEEPER_EXCEPTION), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x9b605d4 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0x134a60d5 in /usr/bin/clickhouse
2. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0x134a6416 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0x134e6202 in /usr/bin/clickhouse
4. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan, std::__1::shared_ptr<DB::ZooKeeperLog>) @ 0x134e475c in /usr/bin/clickhouse
5. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0x134a89e1 in /usr/bin/clickhouse
6. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::ZooKeeperLog>) @ 0x134ab12d in /usr/bin/clickhouse
7. void std::__1::allocator<zkutil::ZooKeeper>::construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10], std::__1::shared_ptr<DB::ZooKeeperLog> >(zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10], std::__1::shared_ptr<DB::ZooKeeperLog>&&) @ 0x11ea3abb in /usr/bin/clickhouse
8. DB::Context::getZooKeeper() const @ 0x11e83856 in /usr/bin/clickhouse
9. DB::DDLWorker::getAndSetZooKeeper() @ 0x11ed5e6c in /usr/bin/clickhouse
10. DB::DDLWorker::initializeMainThread() @ 0x11ee746c in /usr/bin/clickhouse
11. DB::DDLWorker::runMainThread() @ 0x11ed38f4 in /usr/bin/clickhouse
12. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0x11ee847a in /usr/bin/clickhouse
13. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x9ba2697 in /usr/bin/clickhouse
14. ? @ 0x9ba609d in /usr/bin/clickhouse
15. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
16. clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so
(version 21.11.4.14 (official build))
2022.03.11 07:42:23.082220 [ 249 ] {eef27851-f848-4b5a-8a95-cf568951cf75} <Warning> etl.tmp (3eb29f08-e712-4705-beb2-9f08e712c705): It looks like the table /clickhouse/tables/01/etl/tmp was created by another server at the same moment, will retry
我在集群上做错了什么?
编辑 2:另外,我已经将 tmp
table 的引擎从 ReplicatedMergeTree
到 MergeTree
并且集成开始工作。那么,问题可能与不正确的复制设置有关?
问题是视图的 DDL 不正确。不幸的是,它以某种方式对 MergeTree
引擎发出警告,但对 ReplicatedMergeTree
引擎不起作用。
这是更正后的脚本:
CREATE MATERIALIZED VIEW etl.tmp_consumer TO etl.tmp AS
SELECT ......
我正在 ClickHouse 集群上使用 Kafka 引擎。目前 ClickHouse 22.1 集群和 Kafka 在 Docker 中 运行。以下是配置:https://github.com/apanasevich/clickhouse-etl-cluster
这里是 Kafka 集成的 DDL:
CREATE DATABASE etl ON CLUSTER cluster;
CREATE TABLE etl.tmp ON CLUSTER cluster
(
`event_timestamp` DateTime64(3, 'Europe/Moscow'),
`account_id` Int32,
`session` String,
`type` Int16
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/etl/tmp', '{replica}')
PARTITION BY toMonday(event_timestamp)
ORDER BY (account_id, event_timestamp)
TTL toDateTime(event_timestamp) + toIntervalDay(90);
CREATE TABLE etl.tmp_kafka_datasource
(
`topic_data` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:29092', kafka_topic_list = 'Tmp',
kafka_group_name = 'clickhouse_etl_group', kafka_format = 'JSONAsString', kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW etl.tmp_consumer TO etl.tmp
(
`event_timestamp` DateTime64(3),
`account_id` Int32,
`session` String,
`type` Int16
) AS
SELECT
fromUnixTimestamp64Milli(JSONExtract(topic_data, 'time', 'Int64')) AS event_timestamp,
toInt32(JSON_VALUE(topic_data, '$.data.accountId')) AS account_id,
JSON_VALUE(topic_data, '$.data.session') AS session,
toInt16(JSON_VALUE(topic_data, '$.data.type')) AS type
FROM etl.tmp_kafka_datasource;
tabletmp
中没有数据的问题,尽管它被VIEW消费(Kafka中相应的消费者组没有滞后)。
同时可以通过INSERT INTO tmp
语句直接成功插入数据,并复制到第二台主机上。
已编辑:
来自主题的数据可以通过 select * from etl.tmp_kafka_datasource
查询直接成功使用。
在单个 ClickHouse 实例上的相同场景效果很好。
我已经从 docker 文件中删除了卷映射,现在唯一的错误是:
2022.03.11 07:39:54.519652 [ 1 ] {} <Warning> Application: Listen [::]:9005 failed: Poco::Exception. Code: 1000, e.code() = 0, DNS error: EAI: Address family for hostname not supported (version 21.11.4.14 (official build)). If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> . Example for disabled IPv4: <listen_host>::</listen_host>
2022.03.11 07:39:54.641049 [ 249 ] {} <Error> virtual bool DB::DDLWorker::initializeMainThread(): Code: 999. Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
(Connection loss). (KEEPER_EXCEPTION), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x9b605d4 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0x134a60d5 in /usr/bin/clickhouse
2. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0x134a6416 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0x134e6202 in /usr/bin/clickhouse
4. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan, std::__1::shared_ptr<DB::ZooKeeperLog>) @ 0x134e475c in /usr/bin/clickhouse
5. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0x134a89e1 in /usr/bin/clickhouse
6. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::ZooKeeperLog>) @ 0x134ab12d in /usr/bin/clickhouse
7. void std::__1::allocator<zkutil::ZooKeeper>::construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10], std::__1::shared_ptr<DB::ZooKeeperLog> >(zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10], std::__1::shared_ptr<DB::ZooKeeperLog>&&) @ 0x11ea3abb in /usr/bin/clickhouse
8. DB::Context::getZooKeeper() const @ 0x11e83856 in /usr/bin/clickhouse
9. DB::DDLWorker::getAndSetZooKeeper() @ 0x11ed5e6c in /usr/bin/clickhouse
10. DB::DDLWorker::initializeMainThread() @ 0x11ee746c in /usr/bin/clickhouse
11. DB::DDLWorker::runMainThread() @ 0x11ed38f4 in /usr/bin/clickhouse
12. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0x11ee847a in /usr/bin/clickhouse
13. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x9ba2697 in /usr/bin/clickhouse
14. ? @ 0x9ba609d in /usr/bin/clickhouse
15. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
16. clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so
(version 21.11.4.14 (official build))
2022.03.11 07:42:23.082220 [ 249 ] {eef27851-f848-4b5a-8a95-cf568951cf75} <Warning> etl.tmp (3eb29f08-e712-4705-beb2-9f08e712c705): It looks like the table /clickhouse/tables/01/etl/tmp was created by another server at the same moment, will retry
我在集群上做错了什么?
编辑 2:另外,我已经将 tmp
table 的引擎从 ReplicatedMergeTree
到 MergeTree
并且集成开始工作。那么,问题可能与不正确的复制设置有关?
问题是视图的 DDL 不正确。不幸的是,它以某种方式对 MergeTree
引擎发出警告,但对 ReplicatedMergeTree
引擎不起作用。
这是更正后的脚本:
CREATE MATERIALIZED VIEW etl.tmp_consumer TO etl.tmp AS
SELECT ......