ksqlDB 中的自动递增列?
Auto-incrementing column in ksqlDB?
我目前正在使用 this process(见下文)在 ksqlDB 中生成一个自动递增的列。但现在我想知道这种方法是否存在竞争条件或其他同步问题。这是在 ksqlDB 中生成自动递增列的好方法吗?如果不行,有没有更好的办法?
Suppose you want to insert values from one ksqlDB stream into another while auto-incrementing some integer value in the
destination stream.
First, create the two streams:
CREATE STREAM dest (ROWKEY INT KEY, i INT, x INT) WITH (kafka_topic='test_dest', value_format='json', partitions=1);
CREATE STREAM src (x INT) WITH (kafka_topic='test_src', value_format='json', partitions=1);
Next, create a materialized view that will contain the maximum value of the destination stream.
CREATE TABLE dest_maxi AS SELECT MAX(i) AS i FROM dest GROUP BY 1;
We need to be able to join the source stream to the materialized view. To do so, we'll create another intermediate stream
with a dummy one
column that's always set to 1, which is what we grouped the materialized view on:
CREATE STREAM src_one AS SELECT x, 1 AS one FROM src;
INSERT INTO dest SELECT COALESCE(dest_maxi.i,0)+1 AS i, src_one.x AS x FROM src_one LEFT JOIN dest_maxi ON src_one.one = dest_maxi.ROWKEY PARTITION BY COALESCE(dest_maxi.i,0)+1 EMIT CHANGES;
Now you can insert values into stream src
and watch them come up in stream dest
with auto-incrementing IDs.
我认为你的方法行不通。 kslqDB 不保证在两个不同查询中处理记录的顺序。在您的情况下,这意味着没有订购保证
CREATE TABLE dest_maxi AS <query>;
会运行并在
之前更新dest_maxi
INSERT INTO dest <query>;
运行。因此,我认为您会 运行 遇到问题。
您似乎在尝试获取数字流,例如
1234
24746
24848
4947
34
并添加一个自动递增的 id 列,以便结果如下所示:
1, 1234
2, 24746
3, 24848
4, 4947
5, 34
这样的东西应该能满足您的需求:
-- source stream of numbers:
CREATE STREAM src (
x INT
) WITH (
kafka_topic='test_src',
value_format='json'
);
-- intermediate 'table' of numbers and current count:
CREATE TABLE with_counter
WITH (partitions = 1) AS
SELECT
1 as k,
LATEST_BY_OFFSET(x) as x,
COUNT(1) AS id
FROM src
GROUP BY 1
-- if you need this back as a stream in ksqlDB you can run:
CREATE STREAM dest (
x INT,
id BIGINT
) WITH (
kafka_topic='WITH_COUNTER',
value_format='json'
);
UDAF 计算每个键的值,因此我们按常量分组,确保所有输入行都集中到一个键中(和分区 - 所以这不能很好地扩展!)。
我们使用 COUNT
来计算看到的行数,因此它的输出自动递增,我们使用 LATEST_BY_OFFSET
将 x
的当前值抓取到我们的 table.
with_counter
table 的变更日志将包含您想要的输出,只有 1
:
的常量键
1 -> 1, 1234
1 -> 2, 24746
1 -> 3, 24848
1 -> 4, 4947
1 -> 5, 34
我们将其作为 dest
流重新导入到 ksqlDB 中。您可以正常使用。如果你想要一个没有密钥的主题,你可以 运行:
CREATE STREAM without_key AS SELECT * FROM dest;
我目前正在使用 this process(见下文)在 ksqlDB 中生成一个自动递增的列。但现在我想知道这种方法是否存在竞争条件或其他同步问题。这是在 ksqlDB 中生成自动递增列的好方法吗?如果不行,有没有更好的办法?
Suppose you want to insert values from one ksqlDB stream into another while auto-incrementing some integer value in the destination stream.
First, create the two streams:
CREATE STREAM dest (ROWKEY INT KEY, i INT, x INT) WITH (kafka_topic='test_dest', value_format='json', partitions=1); CREATE STREAM src (x INT) WITH (kafka_topic='test_src', value_format='json', partitions=1);
Next, create a materialized view that will contain the maximum value of the destination stream.
CREATE TABLE dest_maxi AS SELECT MAX(i) AS i FROM dest GROUP BY 1;
We need to be able to join the source stream to the materialized view. To do so, we'll create another intermediate stream with a dummy
one
column that's always set to 1, which is what we grouped the materialized view on:CREATE STREAM src_one AS SELECT x, 1 AS one FROM src; INSERT INTO dest SELECT COALESCE(dest_maxi.i,0)+1 AS i, src_one.x AS x FROM src_one LEFT JOIN dest_maxi ON src_one.one = dest_maxi.ROWKEY PARTITION BY COALESCE(dest_maxi.i,0)+1 EMIT CHANGES;
Now you can insert values into stream
src
and watch them come up in streamdest
with auto-incrementing IDs.
我认为你的方法行不通。 kslqDB 不保证在两个不同查询中处理记录的顺序。在您的情况下,这意味着没有订购保证
CREATE TABLE dest_maxi AS <query>;
会运行并在
之前更新dest_maxi
INSERT INTO dest <query>;
运行。因此,我认为您会 运行 遇到问题。
您似乎在尝试获取数字流,例如
1234
24746
24848
4947
34
并添加一个自动递增的 id 列,以便结果如下所示:
1, 1234
2, 24746
3, 24848
4, 4947
5, 34
这样的东西应该能满足您的需求:
-- source stream of numbers:
CREATE STREAM src (
x INT
) WITH (
kafka_topic='test_src',
value_format='json'
);
-- intermediate 'table' of numbers and current count:
CREATE TABLE with_counter
WITH (partitions = 1) AS
SELECT
1 as k,
LATEST_BY_OFFSET(x) as x,
COUNT(1) AS id
FROM src
GROUP BY 1
-- if you need this back as a stream in ksqlDB you can run:
CREATE STREAM dest (
x INT,
id BIGINT
) WITH (
kafka_topic='WITH_COUNTER',
value_format='json'
);
UDAF 计算每个键的值,因此我们按常量分组,确保所有输入行都集中到一个键中(和分区 - 所以这不能很好地扩展!)。
我们使用 COUNT
来计算看到的行数,因此它的输出自动递增,我们使用 LATEST_BY_OFFSET
将 x
的当前值抓取到我们的 table.
with_counter
table 的变更日志将包含您想要的输出,只有 1
:
1 -> 1, 1234
1 -> 2, 24746
1 -> 3, 24848
1 -> 4, 4947
1 -> 5, 34
我们将其作为 dest
流重新导入到 ksqlDB 中。您可以正常使用。如果你想要一个没有密钥的主题,你可以 运行:
CREATE STREAM without_key AS SELECT * FROM dest;