ksql 中的重复键 table
Duplicated keys in ksql table
我正在尝试创建一个 ksql table 来保留实体的最新版本。我想知道为此推荐的方法是什么。
到目前为止,我已经尝试了以下方法:我通过 debezium 将发票加载到 kafka 中,然后我创建了以下流以便能够从 ksql 使用它们:
ksql> create stream invoice_stream with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');
Debezium 在前后添加了有关数据库状态的数据 table 行,这使得它有点难以使用,所以我在上面创建了另一个流以仅获取我感兴趣的数据:
create stream invoice
with (kafka_topic='invoice', value_format='AVRO')
as
select i.before->id as before_id,
i.after->id as after_id,
ifnull(i.transaction->id, 'NA') as transaction_id,
i.after->description as description,
i.after->invoice_date as invoice_date,
i.after->status as status
from invoice_stream i;
到目前为止一切顺利,我可以使用推送查询查询流并查看预期结果:
ksql> select * from invoice emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME |ROWKEY |BEFORE_ID |AFTER_ID |TRANSACTION_ID |DESCRIPTION |INVOICE_DATE |STATUS |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498 | |null |1 |NA |Invoice A |18201 |N |
|1583961059499 | |null |2 |NA |Invoice B |18205 |N |
|1583961059499 | |null |3 |NA |Invoice C |18210 |N |
|1583961059499 | |null |4 |NA |Invoice D |18215 |N |
|1583961263233 | |null |5 |623 |test line added later |18263 |N |
|1584007291546 | |5 |5 |625 |test line added later |18263 |P |
由于没有密钥,我在指定分区的顶部创建了另一个流:
ksql> create stream invoice_rekeyed as select * from invoice partition by after_id;
ksql> describe invoice_rekeyed;
Name : INVOICE_REKEYED
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
BEFORE_ID | INTEGER
AFTER_ID | INTEGER (key)
TRANSACTION_ID | VARCHAR(STRING)
DESCRIPTION | VARCHAR(STRING)
INVOICE_DATE | INTEGER
STATUS | VARCHAR(STRING)
--------------------------------------------
最后我创建了一个 table 这样的:
create table invoice_table(before_id int, after_id int, transaction_id string, description string, invoice_date int, status string)
with (kafka_topic='INVOICE_REKEYED', key='after_id', value_format='AVRO');
所以此时我希望能够通过行键查询 table,但是我收到以下消息:
ksql> select * from invoice_table where rowkey = 5;
Table 'INVOICE_TABLE' is not materialized. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause
KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields>, <aggregate_functions> FROM <sources> GROUP BY <key>' style statement.
Query syntax in KSQL has changed. There are now two broad categories of queries:
- Pull queries: query the current state of the system, return a result, and terminate.
- Push queries: query the state of the system in motion and continue to output results until they meet a LIMIT condition or are terminated by the user.
'EMIT CHANGES' is used to to indicate a query is a push query. To convert a pull query into a push query, which was the default behavior in older versions of KSQL, add `EMIT CHANGES` to the end of the statement
before any LIMIT clause.
For example, the following are pull queries:
'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)
'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)
The following is a push query:
'SELECT * FROM X EMIT CHANGES;'
Note: Persistent queries, e.g. `CREATE TABLE AS ...`, have an implicit `EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.
此外,如果我将其作为推送查询进行查询,我会看到键 5 不止一行:
ksql> select * from invoice_table emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME |ROWKEY |BEFORE_ID |AFTER_ID |TRANSACTION_ID |DESCRIPTION |INVOICE_DATE |STATUS |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498 |1 |null |1 |NA |Invoice A |18201 |N |
|1583961059499 |2 |null |2 |NA |Invoice B |18205 |N |
|1583961059499 |3 |null |3 |NA |Invoice C |18210 |N |
|1583961059499 |4 |null |4 |NA |Invoice D |18215 |N |
|1583961263233 |5 |null |5 |623 |test line added later |18263 |N |
|1584007291546 |5 |5 |5 |625 |test line added later |18263 |P |
我想明白为什么 table 没有被具体化,因为根据之前的消息,这似乎是阻止我开始通过 rowkey 查询 table 的原因。
提前致谢
更新
尝试 Robin 指出的示例,我实际上得到了预期的行为;在示例中,运行在更新原始数据库行的同时执行查询使更改出现:
ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME |ROWKEY |ID |FIRST_NAME |LAST_NAME |EMAIL |GENDER |CLUB_STATUS |COMMENTS |CREATE_TS |UPDATE_TS |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102664415 |5 |5 |Hansiain |Coda |hcoda4@senate.gov|Male |platinum |Centralized full-|2020-03-13T12:29:|2020-03-13T12:29:|
| | | | | | | | |range approach |53Z |53Z |
|1584102741712 |5 |5 |Rodrigo |Coda |hcoda4@senate.gov|Male |platinum |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
| | | | | | | | |range approach |53Z |21Z |
但是,如果查询终止并再次 运行,则只有最新版本可用:
ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME |ROWKEY |ID |FIRST_NAME |LAST_NAME |EMAIL |GENDER |CLUB_STATUS |COMMENTS |CREATE_TS |UPDATE_TS |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102741712 |5 |5 |Rodrigo |Coda |hcoda4@senate.gov|Male |platinum |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
| | | | | | | | |range approach |53Z |21Z |
然而,在我的例子中原则上做同样的事情,总是return行的所有版本:
ksql> print 'dbserver1.invoices.invoice' from beginning limit 50;
Format:AVRO
3/13/20 12:23:09 PM UTC, 1, {"id": 1, "description": "Invoice A", "invoice_date": 18201, "status": "N", "__op": "r", "__ts_ms": 1584102188934, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 2, {"id": 2, "description": "Invoice B", "invoice_date": 18205, "status": "N", "__op": "r", "__ts_ms": 1584102188936, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 3, {"id": 3, "description": "Invoice C", "invoice_date": 18210, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 4, {"id": 4, "description": "Invoice D", "invoice_date": 18215, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
^CTopic printing ceased
ksql> create table invoice_table with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');
Message
---------------
Table created
---------------
ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME |ROWKEY |ID |DESCRIPTION |INVOICE_DATE |STATUS |__OP |__TS_MS |__TRANSACTION_ID |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675 |4 |4 |Invoice D |18215 |N |r |1584102188938 |null |
|1584102365378 |4 |4 |Invoice D UPDATED |18215 |N |u |1584102365128 |623 |
^CQuery terminated
ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME |ROWKEY |ID |DESCRIPTION |INVOICE_DATE |STATUS |__OP |__TS_MS |__TRANSACTION_ID |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675 |4 |4 |Invoice D |18215 |N |r |1584102188938 |null |
|1584102365378 |4 |4 |Invoice D UPDATED |18215 |N |u |1584102365128 |623 |
是否知道可以对行为产生这种影响的任何配置?
这里有几件事要解开并帮助您,但顶级答案是 非物化 tables 不支持拉取查询,并且你还没有实现它,在 #3985 交付之前你也不能。
如您所见,您可以 运行 针对 table 的推送查询。您看到的多个输出取决于状态的变化。如果您取消推送查询并重新 运行 它,您将只看到每个键的一个状态。
Debezium adds data about the state of the database table row before and after what makes it a bit difficult to use
查看 io.debezium.transforms.ExtractNewRecordState
单一消息转换,它将扁平化负载并仅将当前状态放入消息中
'transforms'= 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
As there is no key, I created another stream on top where I specify the partition:
这是一种方法,但更好的方法是将密钥设置为 Kafka Connect 摄取的一部分
'transforms'= 'extractkey',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
有关这些操作的示例,请查看 this recent QCon workshop。
我正在尝试创建一个 ksql table 来保留实体的最新版本。我想知道为此推荐的方法是什么。
到目前为止,我已经尝试了以下方法:我通过 debezium 将发票加载到 kafka 中,然后我创建了以下流以便能够从 ksql 使用它们:
ksql> create stream invoice_stream with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');
Debezium 在前后添加了有关数据库状态的数据 table 行,这使得它有点难以使用,所以我在上面创建了另一个流以仅获取我感兴趣的数据:
create stream invoice
with (kafka_topic='invoice', value_format='AVRO')
as
select i.before->id as before_id,
i.after->id as after_id,
ifnull(i.transaction->id, 'NA') as transaction_id,
i.after->description as description,
i.after->invoice_date as invoice_date,
i.after->status as status
from invoice_stream i;
到目前为止一切顺利,我可以使用推送查询查询流并查看预期结果:
ksql> select * from invoice emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME |ROWKEY |BEFORE_ID |AFTER_ID |TRANSACTION_ID |DESCRIPTION |INVOICE_DATE |STATUS |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498 | |null |1 |NA |Invoice A |18201 |N |
|1583961059499 | |null |2 |NA |Invoice B |18205 |N |
|1583961059499 | |null |3 |NA |Invoice C |18210 |N |
|1583961059499 | |null |4 |NA |Invoice D |18215 |N |
|1583961263233 | |null |5 |623 |test line added later |18263 |N |
|1584007291546 | |5 |5 |625 |test line added later |18263 |P |
由于没有密钥,我在指定分区的顶部创建了另一个流:
ksql> create stream invoice_rekeyed as select * from invoice partition by after_id;
ksql> describe invoice_rekeyed;
Name : INVOICE_REKEYED
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
BEFORE_ID | INTEGER
AFTER_ID | INTEGER (key)
TRANSACTION_ID | VARCHAR(STRING)
DESCRIPTION | VARCHAR(STRING)
INVOICE_DATE | INTEGER
STATUS | VARCHAR(STRING)
--------------------------------------------
最后我创建了一个 table 这样的:
create table invoice_table(before_id int, after_id int, transaction_id string, description string, invoice_date int, status string)
with (kafka_topic='INVOICE_REKEYED', key='after_id', value_format='AVRO');
所以此时我希望能够通过行键查询 table,但是我收到以下消息:
ksql> select * from invoice_table where rowkey = 5;
Table 'INVOICE_TABLE' is not materialized. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause
KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields>, <aggregate_functions> FROM <sources> GROUP BY <key>' style statement.
Query syntax in KSQL has changed. There are now two broad categories of queries:
- Pull queries: query the current state of the system, return a result, and terminate.
- Push queries: query the state of the system in motion and continue to output results until they meet a LIMIT condition or are terminated by the user.
'EMIT CHANGES' is used to to indicate a query is a push query. To convert a pull query into a push query, which was the default behavior in older versions of KSQL, add `EMIT CHANGES` to the end of the statement
before any LIMIT clause.
For example, the following are pull queries:
'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)
'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)
The following is a push query:
'SELECT * FROM X EMIT CHANGES;'
Note: Persistent queries, e.g. `CREATE TABLE AS ...`, have an implicit `EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.
此外,如果我将其作为推送查询进行查询,我会看到键 5 不止一行:
ksql> select * from invoice_table emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME |ROWKEY |BEFORE_ID |AFTER_ID |TRANSACTION_ID |DESCRIPTION |INVOICE_DATE |STATUS |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498 |1 |null |1 |NA |Invoice A |18201 |N |
|1583961059499 |2 |null |2 |NA |Invoice B |18205 |N |
|1583961059499 |3 |null |3 |NA |Invoice C |18210 |N |
|1583961059499 |4 |null |4 |NA |Invoice D |18215 |N |
|1583961263233 |5 |null |5 |623 |test line added later |18263 |N |
|1584007291546 |5 |5 |5 |625 |test line added later |18263 |P |
我想明白为什么 table 没有被具体化,因为根据之前的消息,这似乎是阻止我开始通过 rowkey 查询 table 的原因。
提前致谢
更新
尝试 Robin 指出的示例,我实际上得到了预期的行为;在示例中,运行在更新原始数据库行的同时执行查询使更改出现:
ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME |ROWKEY |ID |FIRST_NAME |LAST_NAME |EMAIL |GENDER |CLUB_STATUS |COMMENTS |CREATE_TS |UPDATE_TS |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102664415 |5 |5 |Hansiain |Coda |hcoda4@senate.gov|Male |platinum |Centralized full-|2020-03-13T12:29:|2020-03-13T12:29:|
| | | | | | | | |range approach |53Z |53Z |
|1584102741712 |5 |5 |Rodrigo |Coda |hcoda4@senate.gov|Male |platinum |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
| | | | | | | | |range approach |53Z |21Z |
但是,如果查询终止并再次 运行,则只有最新版本可用:
ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME |ROWKEY |ID |FIRST_NAME |LAST_NAME |EMAIL |GENDER |CLUB_STATUS |COMMENTS |CREATE_TS |UPDATE_TS |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102741712 |5 |5 |Rodrigo |Coda |hcoda4@senate.gov|Male |platinum |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
| | | | | | | | |range approach |53Z |21Z |
然而,在我的例子中原则上做同样的事情,总是return行的所有版本:
ksql> print 'dbserver1.invoices.invoice' from beginning limit 50;
Format:AVRO
3/13/20 12:23:09 PM UTC, 1, {"id": 1, "description": "Invoice A", "invoice_date": 18201, "status": "N", "__op": "r", "__ts_ms": 1584102188934, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 2, {"id": 2, "description": "Invoice B", "invoice_date": 18205, "status": "N", "__op": "r", "__ts_ms": 1584102188936, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 3, {"id": 3, "description": "Invoice C", "invoice_date": 18210, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 4, {"id": 4, "description": "Invoice D", "invoice_date": 18215, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
^CTopic printing ceased
ksql> create table invoice_table with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');
Message
---------------
Table created
---------------
ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME |ROWKEY |ID |DESCRIPTION |INVOICE_DATE |STATUS |__OP |__TS_MS |__TRANSACTION_ID |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675 |4 |4 |Invoice D |18215 |N |r |1584102188938 |null |
|1584102365378 |4 |4 |Invoice D UPDATED |18215 |N |u |1584102365128 |623 |
^CQuery terminated
ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME |ROWKEY |ID |DESCRIPTION |INVOICE_DATE |STATUS |__OP |__TS_MS |__TRANSACTION_ID |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675 |4 |4 |Invoice D |18215 |N |r |1584102188938 |null |
|1584102365378 |4 |4 |Invoice D UPDATED |18215 |N |u |1584102365128 |623 |
是否知道可以对行为产生这种影响的任何配置?
这里有几件事要解开并帮助您,但顶级答案是 非物化 tables 不支持拉取查询,并且你还没有实现它,在 #3985 交付之前你也不能。
如您所见,您可以 运行 针对 table 的推送查询。您看到的多个输出取决于状态的变化。如果您取消推送查询并重新 运行 它,您将只看到每个键的一个状态。
Debezium adds data about the state of the database table row before and after what makes it a bit difficult to use
查看 io.debezium.transforms.ExtractNewRecordState
单一消息转换,它将扁平化负载并仅将当前状态放入消息中
'transforms'= 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
As there is no key, I created another stream on top where I specify the partition:
这是一种方法,但更好的方法是将密钥设置为 Kafka Connect 摄取的一部分
'transforms'= 'extractkey',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
有关这些操作的示例,请查看 this recent QCon workshop。