阿帕奇光束 | Python | Dataflow - 如何使用不同的键加入 BigQuery 的集合?
Apache Beam | Python | Dataflow - How to join BigQuery' collections with different keys?
我遇到了以下问题。
对于特定情况,我正在尝试将 INNER JOIN 与来自 Google BigQuery on Apache Beam (Python) 的两个 table 结合使用。但是,我还没有找到一种本地方法来轻松处理它。
这个查询输出我将在 Google BigQuery 上填写第三个 table,对于这种情况我真的需要在 Google Dataflow 上查询它。第一个 table(客户端)键是“id”列,第二个 table(购买)键是“client_id”列。
1.Tables 示例(考虑 'client_table.id = purchase_table.client_id'):
client_table
| id | name | country |
|----|-------------|---------|
| 1 | first user | usa |
| 2 | second user | usa |
purchase_table
| id | client_id | value |
|----|-------------|---------|
| 1 | 1 | 15 |
| 2 | 1 | 120 |
| 3 | 2 | 190 |
2.Code我正在尝试开发('output'第二行的问题):
options = {'project': PROJECT,
'runner': RUNNER,
'region': REGION,
'staging_location': 'gs://bucket/temp',
'temp_location': 'gs://bucket/temp',
'template_location': 'gs://bucket/temp/test_join'}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)
query_results_1 = (
pipeline
| 'ReadFromBQ_1' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select id as client_id, name from client_table", use_standard_sql=True)))
query_results_2 = (
pipeline
| 'ReadFromBQ_2' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select * from purchase_table", use_standard_sql=True)))
output = ( {'query_results_1':query_results_1,'query_results_2':query_results_2}
| 'join' >> beam.GroupBy('client_id')
| 'writeToBQ' >> beam.io.WriteToBigQuery(
table=TABLE,
dataset=DATASET,
project=PROJECT,
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
pipeline.run()
3.Equivalent SQL 中所需的输出:
SELECT a.name, b.value * from client_table as a INNER JOIN purchase_table as b on a.id = b.client_id;
您可以使用 CoGroupByKey
或侧输入(作为广播连接),具体取决于您的密钥基数。如果你有几个键,每个键都有很多元素,我建议广播加入。
您需要做的第一件事是在 BQ 读取后向您的 PCollections 添加一个键:
kv_1 = query_results_1 | Map(lambda x: (x["id"], x))
kv_2 = query_results_1 | Map(lambda x: (x["client_id"], x))
然后你就可以做 CoGBK 或广播连接了。作为示例(因为它更容易理解),我将使用 Beam College 的 session 中的代码。请注意,在您的示例中,KV 的值是一个字典,因此您需要进行一些修改。
数据
jobs = [
("John", "Data Scientist"),
("Rebecca", "Full Stack Engineer"),
("John", "Data Engineer"),
("Alice", "CEO"),
("Charles", "Web Designer"),
("Ruben", "Tech Writer")
]
hobbies = [
("John", "Baseball"),
("Rebecca", "Football"),
("John", "Piano"),
("Alice", "Photoshop"),
("Charles", "Coding"),
("Rebecca", "Acting"),
("Rebecca", "Reading")
]
加入 CGBK
def inner_join(element):
name = element[0]
jobs = element[1]["jobs"]
hobbies = element[1]["hobbies"]
joined = [{"name": name,
"job": job,
"hobbie": hobbie}
for job in jobs for hobbie in hobbies]
return joined
jobs_create = p | "Create Jobs" >> Create(jobs)
hobbies_create = p | "Create Hobbies" >> Create(hobbies)
cogbk = {"jobs": jobs_create, "hobbies": hobbies_create} | CoGroupByKey()
join = cogbk | FlatMap(inner_join)
使用 Side Inputs 进行广播加入
def broadcast_inner_join(element, side_input):
name = element[0]
job = element[1]
hobbies = side_input.get(name, [])
joined = [{"name": name,
"job": job,
"hobbie": hobbie}
for hobbie in hobbies]
return joined
hobbies_create = (p | "Create Hobbies" >> Create(hobbies)
| beam.GroupByKey()
)
jobs_create = p | "Create Jobs" >> Create(jobs)
boardcast_join = jobs_create | FlatMap(broadcast_inner_join,
side_input=pvalue.AsDict(hobbies_create))
我遇到了以下问题。 对于特定情况,我正在尝试将 INNER JOIN 与来自 Google BigQuery on Apache Beam (Python) 的两个 table 结合使用。但是,我还没有找到一种本地方法来轻松处理它。
这个查询输出我将在 Google BigQuery 上填写第三个 table,对于这种情况我真的需要在 Google Dataflow 上查询它。第一个 table(客户端)键是“id”列,第二个 table(购买)键是“client_id”列。
1.Tables 示例(考虑 'client_table.id = purchase_table.client_id'):
client_table
| id | name | country |
|----|-------------|---------|
| 1 | first user | usa |
| 2 | second user | usa |
purchase_table
| id | client_id | value |
|----|-------------|---------|
| 1 | 1 | 15 |
| 2 | 1 | 120 |
| 3 | 2 | 190 |
2.Code我正在尝试开发('output'第二行的问题):
options = {'project': PROJECT,
'runner': RUNNER,
'region': REGION,
'staging_location': 'gs://bucket/temp',
'temp_location': 'gs://bucket/temp',
'template_location': 'gs://bucket/temp/test_join'}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)
query_results_1 = (
pipeline
| 'ReadFromBQ_1' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select id as client_id, name from client_table", use_standard_sql=True)))
query_results_2 = (
pipeline
| 'ReadFromBQ_2' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select * from purchase_table", use_standard_sql=True)))
output = ( {'query_results_1':query_results_1,'query_results_2':query_results_2}
| 'join' >> beam.GroupBy('client_id')
| 'writeToBQ' >> beam.io.WriteToBigQuery(
table=TABLE,
dataset=DATASET,
project=PROJECT,
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
pipeline.run()
3.Equivalent SQL 中所需的输出:
SELECT a.name, b.value * from client_table as a INNER JOIN purchase_table as b on a.id = b.client_id;
您可以使用 CoGroupByKey
或侧输入(作为广播连接),具体取决于您的密钥基数。如果你有几个键,每个键都有很多元素,我建议广播加入。
您需要做的第一件事是在 BQ 读取后向您的 PCollections 添加一个键:
kv_1 = query_results_1 | Map(lambda x: (x["id"], x))
kv_2 = query_results_1 | Map(lambda x: (x["client_id"], x))
然后你就可以做 CoGBK 或广播连接了。作为示例(因为它更容易理解),我将使用 Beam College 的 session 中的代码。请注意,在您的示例中,KV 的值是一个字典,因此您需要进行一些修改。
数据
jobs = [
("John", "Data Scientist"),
("Rebecca", "Full Stack Engineer"),
("John", "Data Engineer"),
("Alice", "CEO"),
("Charles", "Web Designer"),
("Ruben", "Tech Writer")
]
hobbies = [
("John", "Baseball"),
("Rebecca", "Football"),
("John", "Piano"),
("Alice", "Photoshop"),
("Charles", "Coding"),
("Rebecca", "Acting"),
("Rebecca", "Reading")
]
加入 CGBK
def inner_join(element):
name = element[0]
jobs = element[1]["jobs"]
hobbies = element[1]["hobbies"]
joined = [{"name": name,
"job": job,
"hobbie": hobbie}
for job in jobs for hobbie in hobbies]
return joined
jobs_create = p | "Create Jobs" >> Create(jobs)
hobbies_create = p | "Create Hobbies" >> Create(hobbies)
cogbk = {"jobs": jobs_create, "hobbies": hobbies_create} | CoGroupByKey()
join = cogbk | FlatMap(inner_join)
使用 Side Inputs 进行广播加入
def broadcast_inner_join(element, side_input):
name = element[0]
job = element[1]
hobbies = side_input.get(name, [])
joined = [{"name": name,
"job": job,
"hobbie": hobbie}
for hobbie in hobbies]
return joined
hobbies_create = (p | "Create Hobbies" >> Create(hobbies)
| beam.GroupByKey()
)
jobs_create = p | "Create Jobs" >> Create(jobs)
boardcast_join = jobs_create | FlatMap(broadcast_inner_join,
side_input=pvalue.AsDict(hobbies_create))