如何合并两个文件然后查看 PCollection (Apache Beam)

How to merge two files and then view the PCollection (Apache Beam)

我有两个 csv 文件需要使用 beam (Python SDK) 合并到一个公共列上。文件如下所示:

users_v.csv

user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06

orders_v.csv

order_no,user_id,product_list,date_purchased
1000,1887,Cassava,2000-01-01
1001,838,"Calabash, Water Spinach",2000-01-01

我尝试了以下似乎有效的方法(没有错误),但我无法使用 beam.Map(print):

查看生成的 PCollection
import apache_beam as beam

with beam.Pipeline() as pipeline:
  orders = p | "Read orders" >> beam.io.ReadFromText("orders_v.csv")
  users = p | "Read users" >> beam.io.ReadFromText("users_v.csv")
  {"orders": orders, "users": users} | beam.CoGroupByKey() | beam.Map(print)

如何打印出生成的 PCollection?

代码中有几处错误:

1 - 您在 with 中使用 pipeline,但随后使用 p 作为管道变量

2 - CoGroupByKey之前的字典确定了cogrouped变量的名称,但它仍然需要一个Key Value才能加入

3 - 我猜您想跳过 headers.

代码应如下所示。函数 split_by_kv 远非完美,您需要改进它以便更好地检索键(因为您的某些字段可能包含 ,)。

def split_by_kv(element, index, delimiter=", "):
    # Need a better approach here
    splitted = element.split(delimiter)
    return splitted[index], element


with beam.Pipeline() as p:
    orders = (p | "Read orders" >> ReadFromText("files/orders_v.csv", skip_header_lines=1)
                | "to KV order" >> Map(split_by_kv, index=1, delimiter=",")
             )
    
    users = (p | "Read users" >> ReadFromText("files/users_v.csv", skip_header_lines=1)
               | "to KV users" >> Map(split_by_kv, index=0, delimiter=",")
            )
    
    ({"orders": orders, "users": users} | CoGroupByKey() 
                                        | Map(print)
    )

输出是(键,{“订单”:键的值,“用户”:用户的值})

('1887', {'orders': ['1000,1887,Cassava,2000-01-01'], 'users': []})
('838', {'orders': ['1001,838,"Calabash, Water Spinach",2000-01-01'], 'users': []})
('1', {'orders': [], 'users': ['1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13']})
('2', {'orders': [], 'users': ['2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06']})

此外,您可能想看看新的 DataFrames API