Best (PostgreSQL?) Data Model and Processing for Incremental Entity Resolution/Record Linkage


我们正在尝试通过简单的相等比较来建立确定性实体 Resolution/Record 链接。增量地,在流事件上。我正试图弄清楚如何在 PostgreSQL 中考虑扩展。


1. EventID: 1, Cookie: A
2. EventID: 2, Cookie: B
3. EventID: 3, Cookie: A
4. EventID: 4, Cookie: B

如您所见,到目前为止我们有 2 个属性 - EventID 和 Cookie。从这个事件流中,我们希望有一些表示(数据模型)来描述 EventIDs 1 和 3 连接到 Cookie A,EventIDs 2 和 4 连接到 Cookie B。最简单的伪表示(这是我正在挣扎,也在处理)可以看起来像这样(大括号是集合):



5. EventID: 5, Cookie: A, Email: a@example.com
6. EventID: 6, Cookie: B, Email: b@emample.com




7. EventID: 7, Cookie: A, Phone: 1234




8. EventID: 8, Cookie: B, Phone: 1234

如您所见 - 通过这个新事件,我们发现了一个新的 link,我们现在知道这两个实体是相关联的:


这是一个问题陈述:给定事件流,其中每个事件都有自己唯一的 ID 和 1 到 N 个不同类型的标识符,我们如何进行解析(传递闭包的变体基本上?)在 PostgreSQL(或其他工具?我愿意接受建议)中,因此它可以通过增量处理扩展到 10 亿个事件和 1 亿个实体(以先到者为准)?

到目前为止我们尝试了什么? Python - 自定义算法。运行良好,但内存受限 - 我们使用 Python 字典来存储实体 ID 及其标识符,加上倒排字典来保存标识符及其实体 ID 以进行快速查找(O(1) 查找)。可以想象,在内存中保存这样的字典会消耗大量内存——在 3000 万个实体中,我们有 25GB 的数据。不会缩放。

PostgreSQL - 我使用与我描述的基本相同的 table 来保存实体。然后插入到此 table 的触发器查找 EventID 或 EntityID 与我们想​​要插入相同的所有候选实体,从实体 table 中删除它们,将结果与要插入的新行合并并插入这个新的合并实体。运行良好,无法扩展 - 合并实体时的竞争条件。

Apache Spark - 我将事件流转换成一个图,其中节点是标识符(Phone、电子邮件等),边被定义为“标识符与同一事件中的其他标识符一起出现)并使用了 GraphFrame 的连接组件算法。效果很好,但它是一个批处理每个 运行 的整个历史记录。但我希望它是增量的。

如果您对我使用的代码感兴趣,请随时询问。为了这个 post 并保持足够长的时间,我选择现在不包含它。



您是否考虑过将 EventID 视为另一个 Identifier(例如“EID|1”)会简化倒排索引的维护?

使用此 PostgreSQL table 作为基础:

                 Table "public.matching"
    Column     |  Type   | Collation | Nullable | Default 
 identifier    | text    |           | not null | 
 grouping_id   | integer |           | not null | 
 event_id_orig | integer |           | not null | 
    "pk_matching" PRIMARY KEY, btree (identifier)
    "idx_matching_grouping_id" hash (grouping_id)
    "idx_matching_hash_pk" hash (identifier)

当有新记录到达时,创建 identifier 集合并将 EID|x 添加到集合中,然后 运行 this:

with event_ident as (
  select 1 as event_id, unnest('{"EID|1","Cookie|A"}'::text[]) as identifier
), upd_crit as (
  select distinct m.grouping_id, min(m.grouping_id) over () as min_grouping_id
    from matching m
    join event_ident e on e.identifier = m.identifier
), upd_run as (
  update matching
     set grouping_id = upd_crit.min_grouping_id
    from upd_crit
   where upd_crit.grouping_id = matching.grouping_id
     and upd_crit.min_grouping_id != matching.grouping_id -- Added in response to comment
), insert_run as (
  insert into matching
    (identifier, grouping_id, event_id_orig)
  select e.identifier,
         coalesce(u.min_grouping_id, e.event_id) as grouping_id,
         e.event_id as event_id_orig
    from event_ident e
   cross join (select min(min_grouping_id) as min_grouping_id from upd_crit) u
  on conflict (identifier) do nothing
  returning *
select * from insert_run;


=# select * from matching;
 identifier | grouping_id | event_id_orig 
 EID|3      |           1 |             3
 EID|1      |           1 |             1
 Cookie|A   |           1 |             1
 EID|4      |           2 |             4
 EID|2      |           2 |             2
 Cookie|B   |           2 |             2
(6 rows)


=# select * from matching;
     identifier      | grouping_id | event_id_orig 
 EID|5               |           1 |             5
 Email|a@example.com |           1 |             5
 EID|3               |           1 |             3
 EID|1               |           1 |             1
 Cookie|A            |           1 |             1
 EID|6               |           2 |             6
 Email|b@example.com |           2 |             6
 EID|4               |           2 |             4
 EID|2               |           2 |             2
 Cookie|B            |           2 |             2
(10 rows)

EventId7 之后:

=# select * from matching;
     identifier      | grouping_id | event_id_orig 
 EID|6               |           2 |             6
 Email|b@example.com |           2 |             6
 EID|4               |           2 |             4
 EID|2               |           2 |             2
 Cookie|B            |           2 |             2
 EID|7               |           1 |             7
 Phone|1234          |           1 |             7
 EID|5               |           1 |             5
 Email|a@example.com |           1 |             5
 EID|3               |           1 |             3
 EID|1               |           1 |             1
 Cookie|A            |           1 |             1
(12 rows)


=# select * from matching;
     identifier      | grouping_id | event_id_orig 
 EID|8               |           1 |             8
 EID|6               |           1 |             6
 Email|b@example.com |           1 |             6
 EID|4               |           1 |             4
 EID|2               |           1 |             2
 Cookie|B            |           1 |             2
 EID|7               |           1 |             7
 Phone|1234          |           1 |             7
 EID|5               |           1 |             5
 Email|a@example.com |           1 |             5
 EID|3               |           1 |             3
 EID|1               |           1 |             1
 Cookie|A            |           1 |             1
(13 rows)


select grouping_id, 
       array_agg(distinct event_id_orig) as event_ids, 
       array_agg(identifier) filter (where identifier not like 'EID|%') as identifiers
  from matching
 group by grouping_id;
-[ RECORD 1 ]-----------------------------------------------------------------------
grouping_id | 1
event_ids   | {1,2,3,4,5,6,7,8}
identifiers | {Email|b@example.com,Cookie|B,Phone|1234,Email|a@example.com,Cookie|A}