ksqlDB 中的自动递增列?

Auto-incrementing column in ksqlDB?

我目前正在使用 this process(见下文)在 ksqlDB 中生成一个自动递增的列。但现在我想知道这种方法是否存在竞争条件或其他同步问题。这是在 ksqlDB 中生成自动递增列的好方法吗?如果不行,有没有更好的办法?

Suppose you want to insert values from one ksqlDB stream into another while auto-incrementing some integer value in the destination stream.

First, create the two streams:

CREATE STREAM dest (ROWKEY INT KEY, i INT, x INT) WITH (kafka_topic='test_dest', value_format='json', partitions=1);
CREATE STREAM src (x INT) WITH (kafka_topic='test_src', value_format='json', partitions=1);

Next, create a materialized view that will contain the maximum value of the destination stream.

CREATE TABLE dest_maxi AS SELECT MAX(i) AS i FROM dest GROUP BY 1;

We need to be able to join the source stream to the materialized view. To do so, we'll create another intermediate stream with a dummy one column that's always set to 1, which is what we grouped the materialized view on:

CREATE STREAM src_one AS SELECT x, 1 AS one FROM src;
INSERT INTO dest SELECT COALESCE(dest_maxi.i,0)+1 AS i, src_one.x AS x FROM src_one LEFT JOIN dest_maxi ON src_one.one = dest_maxi.ROWKEY PARTITION BY COALESCE(dest_maxi.i,0)+1 EMIT CHANGES;

Now you can insert values into stream src and watch them come up in stream dest with auto-incrementing IDs.

我认为你的方法行不通。 kslqDB 不保证在两个不同查询中处理记录的顺序。在您的情况下,这意味着没有订购保证

CREATE TABLE dest_maxi AS <query>;

会运行并在

之前更新dest_maxi
INSERT INTO dest <query>;

运行。因此,我认为您会 运行 遇到问题。

您似乎在尝试获取数字流,例如

1234
24746
24848
4947
34

并添加一个自动递增的 id 列,以便结果如下所示:

1, 1234
2, 24746
3, 24848
4, 4947
5, 34

这样的东西应该能满足您的需求:

-- source stream of numbers:
CREATE STREAM src (
     x INT
  ) WITH (
    kafka_topic='test_src', 
    value_format='json'
  );

-- intermediate 'table' of numbers and current count:
CREATE TABLE with_counter 
   WITH (partitions = 1) AS
   SELECT
      1 as k,
      LATEST_BY_OFFSET(x) as x,
      COUNT(1) AS id
   FROM src
   GROUP BY 1


-- if you need this back as a stream in ksqlDB you can run:
CREATE STREAM dest (
     x INT,
     id BIGINT
   ) WITH (
     kafka_topic='WITH_COUNTER',
     value_format='json'
   );

UDAF 计算每个键的值,因此我们按常量分组,确保所有输入行都集中到一个键中(和分区 - 所以这不能很好地扩展!)。

我们使用 COUNT 来计算看到的行数,因此它的输出自动递增,我们使用 LATEST_BY_OFFSETx 的当前值抓取到我们的 table.

with_counter table 的变更日志将包含您想要的输出,只有 1:

的常量键
1 -> 1, 1234
1 -> 2, 24746
1 -> 3, 24848
1 -> 4, 4947
1 -> 5, 34

我们将其作为 dest 流重新导入到 ksqlDB 中。您可以正常使用。如果你想要一个没有密钥的主题,你可以 运行:

CREATE STREAM without_key AS SELECT * FROM dest;