KSQL KTable+KTable Join重复结果异常

KSQL KTabke+KTable Join dublicate result anomaly

我尝试内部连接 ​​ktable 和 ktable。

ab table:

 create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
 create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');

内部加入 ab table 通过 r 键:

create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;

1) 用例。以慢速模式插入新数据

 ksql> insert into a_table values('1','1', 'timeA');
 --wait 5 second;
 ksql> insert into b_table values('1','1', 'timeB');

select * from ab_table emit changes; --return 1行结果

print AB_TABLE from beginning; --return 1行结果

2) 用例。通过快速模式插入新数据

 ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');

  ksql>  print a from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}

   ksql> print b from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}

select * from ab_table emit changes; --return 1行结果

print AB_TABLE from beginning; --return 2行结果

rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}

什么是地狱?为什么在第二个用例中我在主题中有两个重复行?

更新关于主题的信息\table

    name                 : B_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    name                 : A_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    Name                 : AB_TABLE
     Field           | Type                      
    ---------------------------------------------
     ROWTIME         | BIGINT           (system) 
     ROWKEY          | VARCHAR(STRING)  (system) 
     A_TABLE_ROWTIME | BIGINT                    
     A_TABLE_ROWKEY  | VARCHAR(STRING)           
     A_TABLE_R       | VARCHAR(STRING)           
     A_TABLE_TIME    | VARCHAR(STRING)           
     B_TABLE_ROWTIME | BIGINT                    
     B_TABLE_ROWKEY  | VARCHAR(STRING)           
     B_TABLE_R       | VARCHAR(STRING)           
     B_TABLE_TIME    | VARCHAR(STRING)     


topic "a" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

topic "b" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

 topic "AB_TABLE" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

弄清楚这里发生了什么。跟缓冲有关。

默认情况下,ksqlDB 正在缓冲来自两个来源 table 更新日志的输入,即主题 ab。 (此缓冲可用于将报告对同一键的更改的所有多条消息压缩到单个输出中)。

当同时触发对两个 table 的更新时,缓冲意味着当刷新缓冲时两个 table 都会被填充。由于 table-table 联接的双方都会产生输出,因此两个输入事件相互匹配,从而导致主题 AB_TABLE 的两个输出。

PRINT AB_TABLE 正确显示了更新日志中的两行。

但是,SELECT * FROM AB_TABLE EMIT CHANGES 也在缓冲输入,这种缓冲将两个更改压缩为一个输出。

可以通过 cache.max.bytes.buffering 控制缓冲。例如,您可以关闭缓冲:

-- turn off buffering:
SET 'cache.max.bytes.buffering' = 0;

我在 运行 之后再次 运行 你的例子, AB_TABLE 主题中只有一行。

有人可能会争辩说,无论任何缓冲,table-table 连接的正确输出都只是一行。毕竟,处理的第一行不应该找到匹配项,而第二行应该。如果您对此有强烈的感觉,请在 Github.

中提出错误