Kafka Connect JDBC sink - 存储来自 KSQL 的聚合数据
Kafka Connect JDBC sink - store aggregated data from KSQL
我正在尝试使用 kafka connect 将 ksql 查询 (windowed table) 的结果存储到数据库中。
我现在想做的是(使用 kafka jdbc connect)将它们存储在数据库中,但正在更新的行(因为 kafka table 当前更新值)而不仅仅是消息流;
所以尽管消息是这样的:
1558042958867 | User_9 : Window{start=1558042920000 end=-} | User_9 | 20
1558042961348 | User_9 : Window{start=1558042920000 end=-} | User_9 | 21
1558042962141 | User_9 : Window{start=1558042920000 end=-} | User_9 | 22
1558042965552 | User_9 : Window{start=1558042920000 end=-} | User_9 | 23
1558042968275 | User_9 : Window{start=1558042920000 end=-} | User_9 | 24
1558042969668 | User_9 : Window{start=1558042920000 end=-} | User_9 | 25
1558042973915 | User_9 : Window{start=1558042920000 end=-} | User_9 | 26
1558042976235 | User_9 : Window{start=1558042920000 end=-} | User_9 | 27
1558042980197 | User_9 : Window{start=1558042980000 end=-} | User_9 | 1
1558042980635 | User_9 : Window{start=1558042980000 end=-} | User_9 | 2
1558042982969 | User_9 : Window{start=1558042980000 end=-} | User_9 | 3
1558042983511 | User_9 : Window{start=1558042980000 end=-} | User_9 | 4
1558042986352 | User_9 : Window{start=1558042980000 end=-} | User_9 | 5
1558042986863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 6
1558042988328 | User_9 : Window{start=1558042980000 end=-} | User_9 | 7
1558042988863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 8
在数据库中我只想拥有:
User_9 : Window{start=1558042920000 end=-} | User_9 | 27
User_9 : Window{start=1558042980000 end=-} | User_9 | 8
像这样。
ksql/kafka-connect 有什么魔法可以让我做到这一点吗?
为了澄清 - 最后一个字段是这个聚合,它计算在 window 时间内到目前为止发生了多少次 x。
我假设我可以获得 window start + key 作为数据库键并对其进行更新,但我不确定如何在 KSQL 中实现它。也许 Kafka Streams 可以做到这一点?
@编辑:
好的,所以我设法通过将这些属性添加到接收器配置来做到这一点:
pk.mode=record_key
pk.fields=rowkey
insert.mode=upsert
现在行已更新,但 window 数据有些乱码,看起来像这样:
TOTAL USERID rowkey
32 User_9 User_9j�
31 User_9 User_9jı�`
22 User_9 User_9jIJ��
1 User_9 User_9jij�
所以 window 在那里,但是二进制编码?不知道那里发生了什么。
我仍然需要以某种格式获取此日期,即可读
好的,找到解决办法了。
所以,首先我需要的是在查询中创建 window_start/window_end 字段,就像这样:
SELECT [...], WINDOWSTART() AS window_start, WINDOWEND() AS window_end, [...]
之后我不得不将这些参数添加到接收器:
pk.mode=record_value
pk.fields=[...],WINDOW_START
insert.mode=upsert
有效。
我正在尝试使用 kafka connect 将 ksql 查询 (windowed table) 的结果存储到数据库中。
我现在想做的是(使用 kafka jdbc connect)将它们存储在数据库中,但正在更新的行(因为 kafka table 当前更新值)而不仅仅是消息流;
所以尽管消息是这样的:
1558042958867 | User_9 : Window{start=1558042920000 end=-} | User_9 | 20
1558042961348 | User_9 : Window{start=1558042920000 end=-} | User_9 | 21
1558042962141 | User_9 : Window{start=1558042920000 end=-} | User_9 | 22
1558042965552 | User_9 : Window{start=1558042920000 end=-} | User_9 | 23
1558042968275 | User_9 : Window{start=1558042920000 end=-} | User_9 | 24
1558042969668 | User_9 : Window{start=1558042920000 end=-} | User_9 | 25
1558042973915 | User_9 : Window{start=1558042920000 end=-} | User_9 | 26
1558042976235 | User_9 : Window{start=1558042920000 end=-} | User_9 | 27
1558042980197 | User_9 : Window{start=1558042980000 end=-} | User_9 | 1
1558042980635 | User_9 : Window{start=1558042980000 end=-} | User_9 | 2
1558042982969 | User_9 : Window{start=1558042980000 end=-} | User_9 | 3
1558042983511 | User_9 : Window{start=1558042980000 end=-} | User_9 | 4
1558042986352 | User_9 : Window{start=1558042980000 end=-} | User_9 | 5
1558042986863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 6
1558042988328 | User_9 : Window{start=1558042980000 end=-} | User_9 | 7
1558042988863 | User_9 : Window{start=1558042980000 end=-} | User_9 | 8
在数据库中我只想拥有:
User_9 : Window{start=1558042920000 end=-} | User_9 | 27
User_9 : Window{start=1558042980000 end=-} | User_9 | 8
像这样。 ksql/kafka-connect 有什么魔法可以让我做到这一点吗?
为了澄清 - 最后一个字段是这个聚合,它计算在 window 时间内到目前为止发生了多少次 x。
我假设我可以获得 window start + key 作为数据库键并对其进行更新,但我不确定如何在 KSQL 中实现它。也许 Kafka Streams 可以做到这一点?
@编辑:
好的,所以我设法通过将这些属性添加到接收器配置来做到这一点:
pk.mode=record_key
pk.fields=rowkey
insert.mode=upsert
现在行已更新,但 window 数据有些乱码,看起来像这样:
TOTAL USERID rowkey
32 User_9 User_9j�
31 User_9 User_9jı�`
22 User_9 User_9jIJ��
1 User_9 User_9jij�
所以 window 在那里,但是二进制编码?不知道那里发生了什么。 我仍然需要以某种格式获取此日期,即可读
好的,找到解决办法了。 所以,首先我需要的是在查询中创建 window_start/window_end 字段,就像这样:
SELECT [...], WINDOWSTART() AS window_start, WINDOWEND() AS window_end, [...]
之后我不得不将这些参数添加到接收器:
pk.mode=record_value
pk.fields=[...],WINDOW_START
insert.mode=upsert
有效。