KSQL:将多个 child 记录附加到 parent 记录
KSQL: append multiple child records to parent record
我正在尝试使用 KSQL(作为 confluent-5.0.0 的一部分)从一组 parent 记录和 child 记录中创建单个记录,其中每个 parent 记录有多个 child 记录(特别是付款明细和付款涉及的各方)。这些 parent/child 记录由 parent 的 ID 链接。为了说明,我正在处理源系统中大致这种结构的记录:
payment:
| id | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 |
| pmt02 | USD | 13000 | 2018-11-23 |
payment_parties:
| id | payment_id | party_type | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01 | sender | XXYYZZ23 | (null) |
| prt02 | pmt01 | intermediary | AADDEE98 | 123456789 |
| prt03 | pmt01 | receiver | FFGGHH56 | 987654321 |
| prt04 | pmt02 | sender | XXYYZZ23 | (null) |
| prt05 | pmt02 | intermediary | (null) | (null) |
| prt06 | pmt02 | receiver | FFGGHH56 | 987654321 |
这些记录以 AVRO 格式加载到使用 Oracle Golden Gate 的一组 Kafka 主题上,每个 table 有一个主题。这意味着存在以下主题:src_payment
和 src_payment_parties
。根据源系统的运行方式,这些记录的时间戳在几毫秒内。
现在,目的是将这些记录 'flatten' 合并为一条记录,该记录将从传出主题中使用。为了说明,对于上面的记录,所需的输出将沿着这些行:
payment_flattened:
| id | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | (null) | AADDEE98 | 123456789 | FFGGHH56 | 987654321 |
| pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | (null) | (null) | (null) | FFGGHH56 | 987654321 |
我想在这里问的第一个问题如下:我怎样才能最好地实现来自源主题的数据组合?
当然,我自己也尝试过一些动作。为简洁起见,我将描述我试图将第一个付款方附加到付款记录的目的。
第一步:设置源流
注意:由于 OGG 设置向 AVRO 模式添加了一个名为 'table' 的 属性,我必须指定要从主题中获取的字段。此外,我对指定操作类型(例如插入或更新)的字段不感兴趣。
create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');
create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');
第二步:为付款发送者创建流
注意:根据我从文档中收集到的信息,并通过实验发现,为了能够将支付流加入支付方流,后者需要按支付 ID 进行分区。此外,我让连接起作用的唯一方法是重命名该列。
create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;
第三步:加入两个流
注意:我使用的是左连接,因为并非所有参与方都出席了每笔付款。如上面的示例记录,其中 pmt02
没有中介。
create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;
现在,我希望从这个流中得到的输出是这样的:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
相反,我看到的输出是这样的:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
因此,我想问的第二个(two-part)问题是:为什么左连接会产生这些重复的记录?这可以避免吗?
对于文字墙的道歉,我试图在问题描述中尽可能完整。当然,我很乐意添加任何可能遗漏的信息,并尽我所知回答有关设置的问题。
你快到了:-)
WITHIN 1 SECONDS
将为您提供从连接的 双方 触发的结果。
改为尝试 WITHIN (0 SECONDS, 1 SECONDS)
。然后只有连接右侧的记录才会连接到左侧,反之亦然。
您可以在文章 I wrote here 中阅读有关此模式的更多信息。
顺便说一句,如果你想解决来自 OGG 的 table
保留字问题,你可以在 GG 配置中设置 includeTableName
to false
。
我正在尝试使用 KSQL(作为 confluent-5.0.0 的一部分)从一组 parent 记录和 child 记录中创建单个记录,其中每个 parent 记录有多个 child 记录(特别是付款明细和付款涉及的各方)。这些 parent/child 记录由 parent 的 ID 链接。为了说明,我正在处理源系统中大致这种结构的记录:
payment:
| id | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 |
| pmt02 | USD | 13000 | 2018-11-23 |
payment_parties:
| id | payment_id | party_type | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01 | sender | XXYYZZ23 | (null) |
| prt02 | pmt01 | intermediary | AADDEE98 | 123456789 |
| prt03 | pmt01 | receiver | FFGGHH56 | 987654321 |
| prt04 | pmt02 | sender | XXYYZZ23 | (null) |
| prt05 | pmt02 | intermediary | (null) | (null) |
| prt06 | pmt02 | receiver | FFGGHH56 | 987654321 |
这些记录以 AVRO 格式加载到使用 Oracle Golden Gate 的一组 Kafka 主题上,每个 table 有一个主题。这意味着存在以下主题:src_payment
和 src_payment_parties
。根据源系统的运行方式,这些记录的时间戳在几毫秒内。
现在,目的是将这些记录 'flatten' 合并为一条记录,该记录将从传出主题中使用。为了说明,对于上面的记录,所需的输出将沿着这些行:
payment_flattened:
| id | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | (null) | AADDEE98 | 123456789 | FFGGHH56 | 987654321 |
| pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | (null) | (null) | (null) | FFGGHH56 | 987654321 |
我想在这里问的第一个问题如下:我怎样才能最好地实现来自源主题的数据组合?
当然,我自己也尝试过一些动作。为简洁起见,我将描述我试图将第一个付款方附加到付款记录的目的。
第一步:设置源流
注意:由于 OGG 设置向 AVRO 模式添加了一个名为 'table' 的 属性,我必须指定要从主题中获取的字段。此外,我对指定操作类型(例如插入或更新)的字段不感兴趣。
create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');
create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');
第二步:为付款发送者创建流
注意:根据我从文档中收集到的信息,并通过实验发现,为了能够将支付流加入支付方流,后者需要按支付 ID 进行分区。此外,我让连接起作用的唯一方法是重命名该列。
create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;
第三步:加入两个流
注意:我使用的是左连接,因为并非所有参与方都出席了每笔付款。如上面的示例记录,其中 pmt02
没有中介。
create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;
现在,我希望从这个流中得到的输出是这样的:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
相反,我看到的输出是这样的:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
因此,我想问的第二个(two-part)问题是:为什么左连接会产生这些重复的记录?这可以避免吗?
对于文字墙的道歉,我试图在问题描述中尽可能完整。当然,我很乐意添加任何可能遗漏的信息,并尽我所知回答有关设置的问题。
你快到了:-)
WITHIN 1 SECONDS
将为您提供从连接的 双方 触发的结果。
改为尝试 WITHIN (0 SECONDS, 1 SECONDS)
。然后只有连接右侧的记录才会连接到左侧,反之亦然。
您可以在文章 I wrote here 中阅读有关此模式的更多信息。
顺便说一句,如果你想解决来自 OGG 的 table
保留字问题,你可以在 GG 配置中设置 includeTableName
to false
。