是否可以将不完整的事件合并到 KTable 中?
Is it possible to merge incomplete events into a KTable?
不知道KTable能不能满足我们的需要
假设我在 Kafka 中有一个包含事件的主题 myTopic
,我在这个主题上插入了一个 KafkaStreams 应用程序。
假设在时间 t0
,myTopic
包含 3 个事件:
"key_1": { "col_1": "val_k1_c1_@t0", "col_2": "val_k1_c2_@t0"}
"key_2": { "col_1": "val_k2_c1_@t0", "col_2": "val_k2_c2_@t0"}
"key_3": { "col_1": "val_k3_c1_@t0", "col_3": "val_k3_c3_@t0"}
在时间 t1
,一个与 key_1
相关的新事件被推入 myTopic
:
"key_1": {"col_1": "NEWVAL_k1_c1_@t1", "col_2": "NEWVAL_k1_c2_@t1"}
所以在时间t1
,KTable可以这样表示:
KEY COL_1 COL_2 COL_3
key_1 NEWVAL_k1_c1_@t1 NEWVAL_k1_c2_@t1
key_2 val_k2_c1_@t0 val_k2_c2_@t0
key_3 val_k3_c1_@t0 val_k3_c3_@t0
这就是 KTable 的关键概念:保持给定键的最新值。
现在假设在时间 t2
有一个新事件到达,以丰富 key_1
:
"key_1": { "col_3": "val_k1_c3_@t2" }
如果我的理解没问题,KTable 将丢失 col_1
和 col_2
的值,table 将看起来像这样:
KEY COL_1 COL_2 COL_3
key_1 val_k1_c3_@t2
key_2 val_k2_c1_@t0 val_k2_c2_@t0
key_3 val_k3_c1_@t0 val_k3_c3_@t0
我理解这个概念了吗?
是否有一种本机方法来合并值以使 KTable 看起来像这样?
KEY COL_1 COL_2 COL_3
key_1 NEWVAL_k1_c1_@t1 NEWVAL_k1_c2_@t1 val_k1_c3_@t2
key_2 val_k2_c1_@t0 val_k2_c2_@t0
key_3 val_k3_c1_@t0 val_k3_c3_@t0
您的理解是正确的。请注意,KTable
将每条记录视为更新整行的 "complete" 记录。因此,从概念上讲,"key_1": { "col_3": "val_k1_c3_@t2" }
被解释为 col_1: null
和 col_2: null
.
但是,您可以将其作为 KStream
阅读并执行 aggregation()
,而不是直接将主题阅读为 KTable。这允许您相应地更新结果 KTable
。
不知道KTable能不能满足我们的需要
假设我在 Kafka 中有一个包含事件的主题 myTopic
,我在这个主题上插入了一个 KafkaStreams 应用程序。
假设在时间 t0
,myTopic
包含 3 个事件:
"key_1": { "col_1": "val_k1_c1_@t0", "col_2": "val_k1_c2_@t0"}
"key_2": { "col_1": "val_k2_c1_@t0", "col_2": "val_k2_c2_@t0"}
"key_3": { "col_1": "val_k3_c1_@t0", "col_3": "val_k3_c3_@t0"}
在时间 t1
,一个与 key_1
相关的新事件被推入 myTopic
:
"key_1": {"col_1": "NEWVAL_k1_c1_@t1", "col_2": "NEWVAL_k1_c2_@t1"}
所以在时间t1
,KTable可以这样表示:
KEY COL_1 COL_2 COL_3
key_1 NEWVAL_k1_c1_@t1 NEWVAL_k1_c2_@t1
key_2 val_k2_c1_@t0 val_k2_c2_@t0
key_3 val_k3_c1_@t0 val_k3_c3_@t0
这就是 KTable 的关键概念:保持给定键的最新值。
现在假设在时间 t2
有一个新事件到达,以丰富 key_1
:
"key_1": { "col_3": "val_k1_c3_@t2" }
如果我的理解没问题,KTable 将丢失 col_1
和 col_2
的值,table 将看起来像这样:
KEY COL_1 COL_2 COL_3
key_1 val_k1_c3_@t2
key_2 val_k2_c1_@t0 val_k2_c2_@t0
key_3 val_k3_c1_@t0 val_k3_c3_@t0
我理解这个概念了吗?
是否有一种本机方法来合并值以使 KTable 看起来像这样?
KEY COL_1 COL_2 COL_3
key_1 NEWVAL_k1_c1_@t1 NEWVAL_k1_c2_@t1 val_k1_c3_@t2
key_2 val_k2_c1_@t0 val_k2_c2_@t0
key_3 val_k3_c1_@t0 val_k3_c3_@t0
您的理解是正确的。请注意,KTable
将每条记录视为更新整行的 "complete" 记录。因此,从概念上讲,"key_1": { "col_3": "val_k1_c3_@t2" }
被解释为 col_1: null
和 col_2: null
.
但是,您可以将其作为 KStream
阅读并执行 aggregation()
,而不是直接将主题阅读为 KTable。这允许您相应地更新结果 KTable
。