如何合并两个文件然后查看 PCollection (Apache Beam)
How to merge two files and then view the PCollection (Apache Beam)
我有两个 csv 文件需要使用 beam (Python SDK) 合并到一个公共列上。文件如下所示:
users_v.csv
user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06
orders_v.csv
order_no,user_id,product_list,date_purchased
1000,1887,Cassava,2000-01-01
1001,838,"Calabash, Water Spinach",2000-01-01
我尝试了以下似乎有效的方法(没有错误),但我无法使用 beam.Map(print)
:
查看生成的 PCollection
import apache_beam as beam
with beam.Pipeline() as pipeline:
orders = p | "Read orders" >> beam.io.ReadFromText("orders_v.csv")
users = p | "Read users" >> beam.io.ReadFromText("users_v.csv")
{"orders": orders, "users": users} | beam.CoGroupByKey() | beam.Map(print)
如何打印出生成的 PCollection?
代码中有几处错误:
1 - 您在 with
中使用 pipeline
,但随后使用 p
作为管道变量
2 - CoGroupByKey
之前的字典确定了cogrouped变量的名称,但它仍然需要一个Key Value才能加入
3 - 我猜您想跳过 headers.
代码应如下所示。函数 split_by_kv
远非完美,您需要改进它以便更好地检索键(因为您的某些字段可能包含 ,
)。
def split_by_kv(element, index, delimiter=", "):
# Need a better approach here
splitted = element.split(delimiter)
return splitted[index], element
with beam.Pipeline() as p:
orders = (p | "Read orders" >> ReadFromText("files/orders_v.csv", skip_header_lines=1)
| "to KV order" >> Map(split_by_kv, index=1, delimiter=",")
)
users = (p | "Read users" >> ReadFromText("files/users_v.csv", skip_header_lines=1)
| "to KV users" >> Map(split_by_kv, index=0, delimiter=",")
)
({"orders": orders, "users": users} | CoGroupByKey()
| Map(print)
)
输出是(键,{“订单”:键的值,“用户”:用户的值})
('1887', {'orders': ['1000,1887,Cassava,2000-01-01'], 'users': []})
('838', {'orders': ['1001,838,"Calabash, Water Spinach",2000-01-01'], 'users': []})
('1', {'orders': [], 'users': ['1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13']})
('2', {'orders': [], 'users': ['2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06']})
此外,您可能想看看新的 DataFrames API
我有两个 csv 文件需要使用 beam (Python SDK) 合并到一个公共列上。文件如下所示:
users_v.csv
user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06
orders_v.csv
order_no,user_id,product_list,date_purchased
1000,1887,Cassava,2000-01-01
1001,838,"Calabash, Water Spinach",2000-01-01
我尝试了以下似乎有效的方法(没有错误),但我无法使用 beam.Map(print)
:
import apache_beam as beam
with beam.Pipeline() as pipeline:
orders = p | "Read orders" >> beam.io.ReadFromText("orders_v.csv")
users = p | "Read users" >> beam.io.ReadFromText("users_v.csv")
{"orders": orders, "users": users} | beam.CoGroupByKey() | beam.Map(print)
如何打印出生成的 PCollection?
代码中有几处错误:
1 - 您在 with
中使用 pipeline
,但随后使用 p
作为管道变量
2 - CoGroupByKey
之前的字典确定了cogrouped变量的名称,但它仍然需要一个Key Value才能加入
3 - 我猜您想跳过 headers.
代码应如下所示。函数 split_by_kv
远非完美,您需要改进它以便更好地检索键(因为您的某些字段可能包含 ,
)。
def split_by_kv(element, index, delimiter=", "):
# Need a better approach here
splitted = element.split(delimiter)
return splitted[index], element
with beam.Pipeline() as p:
orders = (p | "Read orders" >> ReadFromText("files/orders_v.csv", skip_header_lines=1)
| "to KV order" >> Map(split_by_kv, index=1, delimiter=",")
)
users = (p | "Read users" >> ReadFromText("files/users_v.csv", skip_header_lines=1)
| "to KV users" >> Map(split_by_kv, index=0, delimiter=",")
)
({"orders": orders, "users": users} | CoGroupByKey()
| Map(print)
)
输出是(键,{“订单”:键的值,“用户”:用户的值})
('1887', {'orders': ['1000,1887,Cassava,2000-01-01'], 'users': []})
('838', {'orders': ['1001,838,"Calabash, Water Spinach",2000-01-01'], 'users': []})
('1', {'orders': [], 'users': ['1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13']})
('2', {'orders': [], 'users': ['2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06']})
此外,您可能想看看新的 DataFrames API