如何使用具有自定义格式的 Apache Beam 以 JSON 格式将 BigQuery 结果写入 GCS?
How to write BigQuery results to GCS in JSON format using Apache Beam with custom formatting?
我正在尝试使用 python.
中的 Apache Beam 将 BigQuery table 记录写入 GCS 存储桶中的 JSON 文件
我有一个 BigQuery table - my_project.my_dataset.my_table
像这样
我希望将 table records/entries 写入 GCS 存储桶位置中的 JSON 文件 - “gs://my_core_bucket/data/my_data.json”
预期 JSON 的格式:
[
{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}},
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}},
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}},
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
]
但是,在我当前的 apache 管道实现中,我看到创建的 JSON 文件在文件“gs://my_core_bucket/data/my_data.json”中有这样的条目“=16 =]
{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}}
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}}
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}}
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
如何创建一个干净的 JSON 文件,将 BigQuery 记录作为 JSON 数组元素?
这是我的管道代码。
import os
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrepareData(beam.DoFn):
def process(self, record): # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
rec_columns = [ "id", "name", "address", "phn", "country", "age"] # all columns of the bigquery table
rec_keys = list(record.keys()) # ["id", "name", "address", "phn"] # columns needed for processing
values = {}
for x in range(len(rec_keys)):
key = rec_keys[x]
if key != "id" and key in rec_columns:
values[key] = record[key]
return [{"id": record['id'], "values": values}]
class MainClass:
def run_pipe(self):
try:
project = "my_project"
dataset = "my_dataset"
table = "my_table"
region = "us-central1"
job_name = "create-json-file"
temp_location = "gs://my_core_bucket/dataflow/temp_location/"
runner = "DataflowRunner"
# set pipeline options
argv = [
f'--project={project}',
f'--region={region}',
f'--job_name={job_name}',
f'--temp_location={temp_location}',
f'--runner={runner}'
]
# json file name
file_name = "gs://my_core_bucket/data/my_data"
# create pipeline
p = beam.Pipeline(argv=argv)
# query to read table data
query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"
bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
# bq_data will be in the form
# {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
# {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
# {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
# {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
# alter data in the form needed for downstream process
prepared_data = bq_data | beam.ParDo(PrepareData())
# write formatted pcollection as JSON file
prepared_data | 'JSON format' >> beam.Map(json.dumps)
prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')
# execute pipeline
p.run().wait_until_finish()
except Exception as e:
logging.error(f"Exception in run_pipe - {str(e)}")
if __name__ == "__main__":
main_cls = MainClass()
main_cls.run_pipe()
按照评论中的建议,请尝试将所有结果合并为一个。为了成功序列化组合过程中获得的 set
,您可以使用自定义序列化程序。
您的代码可以如下所示:
import os
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Based on
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
# utility function for list combination
class ListCombineFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, input):
accumulator.append(input)
return accumulator
def merge_accumulators(self, accumulators):
merged = []
for accum in accumulators:
merged += accum
return merged
def extract_output(self, accumulator):
return accumulator
class PrepareData(beam.DoFn):
def process(self, record): # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
rec_columns = [ "id", "name", "address", "phn", "country", "age"] # all columns of the bigquery table
rec_keys = list(record.keys()) # ["id", "name", "address", "phn"] # columns needed for processing
values = {}
for x in range(len(rec_keys)):
key = rec_keys[x]
if key != "id" and key in rec_columns:
values[key] = record[key]
return [{"id": record['id'], "values": values}]
class MainClass:
def run_pipe(self):
try:
project = "my_project"
dataset = "my_dataset"
table = "my_table"
region = "us-central1"
job_name = "create-json-file"
temp_location = "gs://my_core_bucket/dataflow/temp_location/"
runner = "DataflowRunner"
# set pipeline options
argv = [
f'--project={project}',
f'--region={region}',
f'--job_name={job_name}',
f'--temp_location={temp_location}',
f'--runner={runner}'
]
# json file name
file_name = "gs://my_core_bucket/data/my_data"
# create pipeline
p = beam.Pipeline(argv=argv)
# query to read table data
query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"
bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
# bq_data will be in the form
# {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
# {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
# {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
# {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
# alter data in the form needed for downstream process
prepared_data = bq_data | beam.ParDo(PrepareData())
# combine all the results in one PCollection
# see https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/
prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn())
# write formatted pcollection as JSON file. We will use a
# custom encoder for se serialization
prepared_data | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder)
prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')
# execute pipeline
p.run().wait_until_finish()
except Exception as e:
logging.error(f"Exception in run_pipe - {str(e)}")
if __name__ == "__main__":
main_cls = MainClass()
main_cls.run_pipe()
您可以直接在 BigQuery 中执行此操作,然后使用 Dataflow 直接打印结果。
仅更改查询
query = f"Select ARRAY_AGG(str) from (SELECT struct(id as id, name as name, address as address, phn as phn) as str FROM `{project}.{dataset}.{table}` LIMIT 4)"
请记住
- BigQuery 处理总是比数据流处理(或等效芯片上的其他处理)更快、更便宜
- Dataflow 将始终构建一个有效的 JSON(您的 JSON 无效,您不能以数组开头)
我正在尝试使用 python.
中的 Apache Beam 将 BigQuery table 记录写入 GCS 存储桶中的 JSON 文件我有一个 BigQuery table - my_project.my_dataset.my_table
像这样
我希望将 table records/entries 写入 GCS 存储桶位置中的 JSON 文件 - “gs://my_core_bucket/data/my_data.json”
预期 JSON 的格式:
[
{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}},
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}},
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}},
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
]
但是,在我当前的 apache 管道实现中,我看到创建的 JSON 文件在文件“gs://my_core_bucket/data/my_data.json”中有这样的条目“=16 =]
{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}}
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}}
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}}
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
如何创建一个干净的 JSON 文件,将 BigQuery 记录作为 JSON 数组元素?
这是我的管道代码。
import os
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrepareData(beam.DoFn):
def process(self, record): # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
rec_columns = [ "id", "name", "address", "phn", "country", "age"] # all columns of the bigquery table
rec_keys = list(record.keys()) # ["id", "name", "address", "phn"] # columns needed for processing
values = {}
for x in range(len(rec_keys)):
key = rec_keys[x]
if key != "id" and key in rec_columns:
values[key] = record[key]
return [{"id": record['id'], "values": values}]
class MainClass:
def run_pipe(self):
try:
project = "my_project"
dataset = "my_dataset"
table = "my_table"
region = "us-central1"
job_name = "create-json-file"
temp_location = "gs://my_core_bucket/dataflow/temp_location/"
runner = "DataflowRunner"
# set pipeline options
argv = [
f'--project={project}',
f'--region={region}',
f'--job_name={job_name}',
f'--temp_location={temp_location}',
f'--runner={runner}'
]
# json file name
file_name = "gs://my_core_bucket/data/my_data"
# create pipeline
p = beam.Pipeline(argv=argv)
# query to read table data
query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"
bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
# bq_data will be in the form
# {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
# {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
# {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
# {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
# alter data in the form needed for downstream process
prepared_data = bq_data | beam.ParDo(PrepareData())
# write formatted pcollection as JSON file
prepared_data | 'JSON format' >> beam.Map(json.dumps)
prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')
# execute pipeline
p.run().wait_until_finish()
except Exception as e:
logging.error(f"Exception in run_pipe - {str(e)}")
if __name__ == "__main__":
main_cls = MainClass()
main_cls.run_pipe()
按照评论中的建议,请尝试将所有结果合并为一个。为了成功序列化组合过程中获得的 set
,您可以使用自定义序列化程序。
您的代码可以如下所示:
import os
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Based on
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
# utility function for list combination
class ListCombineFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, input):
accumulator.append(input)
return accumulator
def merge_accumulators(self, accumulators):
merged = []
for accum in accumulators:
merged += accum
return merged
def extract_output(self, accumulator):
return accumulator
class PrepareData(beam.DoFn):
def process(self, record): # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
rec_columns = [ "id", "name", "address", "phn", "country", "age"] # all columns of the bigquery table
rec_keys = list(record.keys()) # ["id", "name", "address", "phn"] # columns needed for processing
values = {}
for x in range(len(rec_keys)):
key = rec_keys[x]
if key != "id" and key in rec_columns:
values[key] = record[key]
return [{"id": record['id'], "values": values}]
class MainClass:
def run_pipe(self):
try:
project = "my_project"
dataset = "my_dataset"
table = "my_table"
region = "us-central1"
job_name = "create-json-file"
temp_location = "gs://my_core_bucket/dataflow/temp_location/"
runner = "DataflowRunner"
# set pipeline options
argv = [
f'--project={project}',
f'--region={region}',
f'--job_name={job_name}',
f'--temp_location={temp_location}',
f'--runner={runner}'
]
# json file name
file_name = "gs://my_core_bucket/data/my_data"
# create pipeline
p = beam.Pipeline(argv=argv)
# query to read table data
query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"
bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
# bq_data will be in the form
# {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
# {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
# {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
# {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
# alter data in the form needed for downstream process
prepared_data = bq_data | beam.ParDo(PrepareData())
# combine all the results in one PCollection
# see https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/
prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn())
# write formatted pcollection as JSON file. We will use a
# custom encoder for se serialization
prepared_data | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder)
prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')
# execute pipeline
p.run().wait_until_finish()
except Exception as e:
logging.error(f"Exception in run_pipe - {str(e)}")
if __name__ == "__main__":
main_cls = MainClass()
main_cls.run_pipe()
您可以直接在 BigQuery 中执行此操作,然后使用 Dataflow 直接打印结果。
仅更改查询
query = f"Select ARRAY_AGG(str) from (SELECT struct(id as id, name as name, address as address, phn as phn) as str FROM `{project}.{dataset}.{table}` LIMIT 4)"
请记住
- BigQuery 处理总是比数据流处理(或等效芯片上的其他处理)更快、更便宜
- Dataflow 将始终构建一个有效的 JSON(您的 JSON 无效,您不能以数组开头)