增量实体的最佳(PostgreSQL?)数据模型和处理 Resolution/Record 链接
Best (PostgreSQL?) Data Model and Processing for Incremental Entity Resolution/Record Linkage
我正在解决一个问题,想听听您的意见。
我们正在尝试通过简单的相等比较来建立确定性实体 Resolution/Record 链接。增量地,在流事件上。我正试图弄清楚如何在 PostgreSQL 中考虑扩展。
假设您有一连串的事件。为了举例,让我们坚持使用来自网络跟踪的事件示例。事件可能如下(只留下最重要的部分,不留全部payload):
1. EventID: 1, Cookie: A
2. EventID: 2, Cookie: B
3. EventID: 3, Cookie: A
4. EventID: 4, Cookie: B
如您所见,到目前为止我们有 2 个属性 - EventID 和 Cookie。从这个事件流中,我们希望有一些表示(数据模型)来描述 EventIDs 1 和 3 连接到 Cookie A,EventIDs 2 和 4 连接到 Cookie B。最简单的伪表示(这是我正在挣扎,也在处理)可以看起来像这样(大括号是集合):
EventIDs,Identifiers
{"1","3"},{"Cookie|A"}
{"2","4"},{"Cookie|B"}
让我们再来一轮:
5. EventID: 5, Cookie: A, Email: a@example.com
6. EventID: 6, Cookie: B, Email: b@emample.com
这一轮之后,实体表示将如下所示:
EventIDs,Identifiers
{"1","3","5"},{"Cookie|A","Email|a@example.com"}
{"2","4","6"},{"Cookie|B","Email|b@example.com"}
到目前为止一切顺利。另一项设置压轴的活动:
7. EventID: 7, Cookie: A, Phone: 1234
实体:
EventIDs,Identifiers
{"1","3","5","7"},{"Cookie|A","Email|a@example.com","Phone|1234"}
{"2","4","6"},{"Cookie|B","Email|b@example.com"}
大结局:
8. EventID: 8, Cookie: B, Phone: 1234
如您所见 - 通过这个新事件,我们发现了一个新的 link,我们现在知道这两个实体是相关联的:
EventIDs,Identifiers
{"1","2","3","4","5","6","7","8"},{"Cookie|A","Cookie|B","Email|a@example.com","Email|b@example.com","Phone|1234"}
这是一个问题陈述:给定事件流,其中每个事件都有自己唯一的 ID 和 1 到 N 个不同类型的标识符,我们如何进行解析(传递闭包的变体基本上?)在 PostgreSQL(或其他工具?我愿意接受建议)中,因此它可以通过增量处理扩展到 10 亿个事件和 1 亿个实体(以先到者为准)?
到目前为止我们尝试了什么?
Python - 自定义算法。运行良好,但内存受限 - 我们使用 Python 字典来存储实体 ID 及其标识符,加上倒排字典来保存标识符及其实体 ID 以进行快速查找(O(1) 查找)。可以想象,在内存中保存这样的字典会消耗大量内存——在 3000 万个实体中,我们有 25GB 的数据。不会缩放。
PostgreSQL - 我使用与我描述的基本相同的 table 来保存实体。然后插入到此 table 的触发器查找 EventID 或 EntityID 与我们想要插入相同的所有候选实体,从实体 table 中删除它们,将结果与要插入的新行合并并插入这个新的合并实体。运行良好,无法扩展 - 合并实体时的竞争条件。
Apache Spark - 我将事件流转换成一个图,其中节点是标识符(Phone、电子邮件等),边被定义为“标识符与同一事件中的其他标识符一起出现)并使用了 GraphFrame 的连接组件算法。效果很好,但它是一个批处理每个 运行 的整个历史记录。但我希望它是增量的。
如果您对我使用的代码感兴趣,请随时询问。为了这个 post 并保持足够长的时间,我选择现在不包含它。
如果有任何能促使我们找到解决方案的指示、讨论和建议,我将不胜感激。
非常感谢,期待讨论!
您是否考虑过将 EventID
视为另一个 Identifier
(例如“EID|1”)会简化倒排索引的维护?
使用此 PostgreSQL table 作为基础:
Table "public.matching"
Column | Type | Collation | Nullable | Default
---------------+---------+-----------+----------+---------
identifier | text | | not null |
grouping_id | integer | | not null |
event_id_orig | integer | | not null |
Indexes:
"pk_matching" PRIMARY KEY, btree (identifier)
"idx_matching_grouping_id" hash (grouping_id)
"idx_matching_hash_pk" hash (identifier)
当有新记录到达时,创建 identifier
集合并将 EID|x
添加到集合中,然后 运行 this:
with event_ident as (
select 1 as event_id, unnest('{"EID|1","Cookie|A"}'::text[]) as identifier
), upd_crit as (
select distinct m.grouping_id, min(m.grouping_id) over () as min_grouping_id
from matching m
join event_ident e on e.identifier = m.identifier
), upd_run as (
update matching
set grouping_id = upd_crit.min_grouping_id
from upd_crit
where upd_crit.grouping_id = matching.grouping_id
and upd_crit.min_grouping_id != matching.grouping_id -- Added in response to comment
), insert_run as (
insert into matching
(identifier, grouping_id, event_id_orig)
select e.identifier,
coalesce(u.min_grouping_id, e.event_id) as grouping_id,
e.event_id as event_id_orig
from event_ident e
cross join (select min(min_grouping_id) as min_grouping_id from upd_crit) u
on conflict (identifier) do nothing
returning *
)
select * from insert_run;
在你的前四场比赛之后,我有这个:
=# select * from matching;
identifier | grouping_id | event_id_orig
------------+-------------+---------------
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
EID|4 | 2 | 4
EID|2 | 2 | 2
Cookie|B | 2 | 2
(6 rows)
回合结束后电子邮件地址:
=# select * from matching;
identifier | grouping_id | event_id_orig
---------------------+-------------+---------------
EID|5 | 1 | 5
Email|a@example.com | 1 | 5
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
EID|6 | 2 | 6
Email|b@example.com | 2 | 6
EID|4 | 2 | 4
EID|2 | 2 | 2
Cookie|B | 2 | 2
(10 rows)
EventId
7 之后:
=# select * from matching;
identifier | grouping_id | event_id_orig
---------------------+-------------+---------------
EID|6 | 2 | 6
Email|b@example.com | 2 | 6
EID|4 | 2 | 4
EID|2 | 2 | 2
Cookie|B | 2 | 2
EID|7 | 1 | 7
Phone|1234 | 1 | 7
EID|5 | 1 | 5
Email|a@example.com | 1 | 5
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
(12 rows)
添加您的压轴记录后:
=# select * from matching;
identifier | grouping_id | event_id_orig
---------------------+-------------+---------------
EID|8 | 1 | 8
EID|6 | 1 | 6
Email|b@example.com | 1 | 6
EID|4 | 1 | 4
EID|2 | 1 | 2
Cookie|B | 1 | 2
EID|7 | 1 | 7
Phone|1234 | 1 | 7
EID|5 | 1 | 5
Email|a@example.com | 1 | 5
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
(13 rows)
以您使用的格式检索:
select grouping_id,
array_agg(distinct event_id_orig) as event_ids,
array_agg(identifier) filter (where identifier not like 'EID|%') as identifiers
from matching
group by grouping_id;
-[ RECORD 1 ]-----------------------------------------------------------------------
grouping_id | 1
event_ids | {1,2,3,4,5,6,7,8}
identifiers | {Email|b@example.com,Cookie|B,Phone|1234,Email|a@example.com,Cookie|A}
我正在解决一个问题,想听听您的意见。
我们正在尝试通过简单的相等比较来建立确定性实体 Resolution/Record 链接。增量地,在流事件上。我正试图弄清楚如何在 PostgreSQL 中考虑扩展。
假设您有一连串的事件。为了举例,让我们坚持使用来自网络跟踪的事件示例。事件可能如下(只留下最重要的部分,不留全部payload):
1. EventID: 1, Cookie: A
2. EventID: 2, Cookie: B
3. EventID: 3, Cookie: A
4. EventID: 4, Cookie: B
如您所见,到目前为止我们有 2 个属性 - EventID 和 Cookie。从这个事件流中,我们希望有一些表示(数据模型)来描述 EventIDs 1 和 3 连接到 Cookie A,EventIDs 2 和 4 连接到 Cookie B。最简单的伪表示(这是我正在挣扎,也在处理)可以看起来像这样(大括号是集合):
EventIDs,Identifiers
{"1","3"},{"Cookie|A"}
{"2","4"},{"Cookie|B"}
让我们再来一轮:
5. EventID: 5, Cookie: A, Email: a@example.com
6. EventID: 6, Cookie: B, Email: b@emample.com
这一轮之后,实体表示将如下所示:
EventIDs,Identifiers
{"1","3","5"},{"Cookie|A","Email|a@example.com"}
{"2","4","6"},{"Cookie|B","Email|b@example.com"}
到目前为止一切顺利。另一项设置压轴的活动:
7. EventID: 7, Cookie: A, Phone: 1234
实体:
EventIDs,Identifiers
{"1","3","5","7"},{"Cookie|A","Email|a@example.com","Phone|1234"}
{"2","4","6"},{"Cookie|B","Email|b@example.com"}
大结局:
8. EventID: 8, Cookie: B, Phone: 1234
如您所见 - 通过这个新事件,我们发现了一个新的 link,我们现在知道这两个实体是相关联的:
EventIDs,Identifiers
{"1","2","3","4","5","6","7","8"},{"Cookie|A","Cookie|B","Email|a@example.com","Email|b@example.com","Phone|1234"}
这是一个问题陈述:给定事件流,其中每个事件都有自己唯一的 ID 和 1 到 N 个不同类型的标识符,我们如何进行解析(传递闭包的变体基本上?)在 PostgreSQL(或其他工具?我愿意接受建议)中,因此它可以通过增量处理扩展到 10 亿个事件和 1 亿个实体(以先到者为准)?
到目前为止我们尝试了什么? Python - 自定义算法。运行良好,但内存受限 - 我们使用 Python 字典来存储实体 ID 及其标识符,加上倒排字典来保存标识符及其实体 ID 以进行快速查找(O(1) 查找)。可以想象,在内存中保存这样的字典会消耗大量内存——在 3000 万个实体中,我们有 25GB 的数据。不会缩放。
PostgreSQL - 我使用与我描述的基本相同的 table 来保存实体。然后插入到此 table 的触发器查找 EventID 或 EntityID 与我们想要插入相同的所有候选实体,从实体 table 中删除它们,将结果与要插入的新行合并并插入这个新的合并实体。运行良好,无法扩展 - 合并实体时的竞争条件。
Apache Spark - 我将事件流转换成一个图,其中节点是标识符(Phone、电子邮件等),边被定义为“标识符与同一事件中的其他标识符一起出现)并使用了 GraphFrame 的连接组件算法。效果很好,但它是一个批处理每个 运行 的整个历史记录。但我希望它是增量的。
如果您对我使用的代码感兴趣,请随时询问。为了这个 post 并保持足够长的时间,我选择现在不包含它。
如果有任何能促使我们找到解决方案的指示、讨论和建议,我将不胜感激。
非常感谢,期待讨论!
您是否考虑过将 EventID
视为另一个 Identifier
(例如“EID|1”)会简化倒排索引的维护?
使用此 PostgreSQL table 作为基础:
Table "public.matching"
Column | Type | Collation | Nullable | Default
---------------+---------+-----------+----------+---------
identifier | text | | not null |
grouping_id | integer | | not null |
event_id_orig | integer | | not null |
Indexes:
"pk_matching" PRIMARY KEY, btree (identifier)
"idx_matching_grouping_id" hash (grouping_id)
"idx_matching_hash_pk" hash (identifier)
当有新记录到达时,创建 identifier
集合并将 EID|x
添加到集合中,然后 运行 this:
with event_ident as (
select 1 as event_id, unnest('{"EID|1","Cookie|A"}'::text[]) as identifier
), upd_crit as (
select distinct m.grouping_id, min(m.grouping_id) over () as min_grouping_id
from matching m
join event_ident e on e.identifier = m.identifier
), upd_run as (
update matching
set grouping_id = upd_crit.min_grouping_id
from upd_crit
where upd_crit.grouping_id = matching.grouping_id
and upd_crit.min_grouping_id != matching.grouping_id -- Added in response to comment
), insert_run as (
insert into matching
(identifier, grouping_id, event_id_orig)
select e.identifier,
coalesce(u.min_grouping_id, e.event_id) as grouping_id,
e.event_id as event_id_orig
from event_ident e
cross join (select min(min_grouping_id) as min_grouping_id from upd_crit) u
on conflict (identifier) do nothing
returning *
)
select * from insert_run;
在你的前四场比赛之后,我有这个:
=# select * from matching;
identifier | grouping_id | event_id_orig
------------+-------------+---------------
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
EID|4 | 2 | 4
EID|2 | 2 | 2
Cookie|B | 2 | 2
(6 rows)
回合结束后电子邮件地址:
=# select * from matching;
identifier | grouping_id | event_id_orig
---------------------+-------------+---------------
EID|5 | 1 | 5
Email|a@example.com | 1 | 5
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
EID|6 | 2 | 6
Email|b@example.com | 2 | 6
EID|4 | 2 | 4
EID|2 | 2 | 2
Cookie|B | 2 | 2
(10 rows)
EventId
7 之后:
=# select * from matching;
identifier | grouping_id | event_id_orig
---------------------+-------------+---------------
EID|6 | 2 | 6
Email|b@example.com | 2 | 6
EID|4 | 2 | 4
EID|2 | 2 | 2
Cookie|B | 2 | 2
EID|7 | 1 | 7
Phone|1234 | 1 | 7
EID|5 | 1 | 5
Email|a@example.com | 1 | 5
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
(12 rows)
添加您的压轴记录后:
=# select * from matching;
identifier | grouping_id | event_id_orig
---------------------+-------------+---------------
EID|8 | 1 | 8
EID|6 | 1 | 6
Email|b@example.com | 1 | 6
EID|4 | 1 | 4
EID|2 | 1 | 2
Cookie|B | 1 | 2
EID|7 | 1 | 7
Phone|1234 | 1 | 7
EID|5 | 1 | 5
Email|a@example.com | 1 | 5
EID|3 | 1 | 3
EID|1 | 1 | 1
Cookie|A | 1 | 1
(13 rows)
以您使用的格式检索:
select grouping_id,
array_agg(distinct event_id_orig) as event_ids,
array_agg(identifier) filter (where identifier not like 'EID|%') as identifiers
from matching
group by grouping_id;
-[ RECORD 1 ]-----------------------------------------------------------------------
grouping_id | 1
event_ids | {1,2,3,4,5,6,7,8}
identifiers | {Email|b@example.com,Cookie|B,Phone|1234,Email|a@example.com,Cookie|A}