在 Google Cloud Dataflow - Apache Beam 中使用 Python SDK 通过推断架构来读取和写入 avro 文件
Read and write avro files by inferring schema using Python SDK in Google Cloud Dataflow - Apache Beam
问题: 我正在尝试创建一个云数据流管道,使用 Python SDK 从 Google 云存储读取 Avro 文件,进行一些处理并在 Google 云存储上写回 Avro 文件。在查看 Apache Beam 网站上提供的一些示例后,我尝试了 运行 以下代码。我使用了 ReadFromAvro
和 WriteToAvro
函数。我想要实现的是只读取一个 Avro 文件并使用 Dataflow 写入相同的 Avro 文件但它给了我以下警告并且不输出 avro 文件。
Warning/Error:
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:121: DeprecationWarning: object() takes no parameters
super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.31790304184 seconds
Traceback (most recent call last):
File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 52, in <module>
run()
File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 47, in run
records | WriteToAvro(known_args.output)
TypeError: __init__() takes at least 3 arguments (2 given)
代码:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET/000000_0.avro',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://BUCKET/',
#required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# run your pipeline on the Google Cloud Dataflow Service.
'--runner=DataflowRunner',
# CHANGE 3/5: Your project ID is required in order to run your pipeline on
# the Google Cloud Dataflow Service.
'--project=PROJECT_NAME',
# CHANGE 4/5: Your Google Cloud Storage path is required for staging local
# files.
'--staging_location=gs://BUCKET/staging',
# CHANGE 5/5: Your Google Cloud Storage path is required for temporary
# files.
'--temp_location=gs://BUCKET/temp',
'--job_name=parse-avro',
])
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
编辑:
我尝试将架构添加到 WriteToAvro
函数,但现在出现以下错误:
错误:
/usr/local/bin/python /Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.
warnings.warn('Using fallback coder for typehint: %r.' % typehint)
架构:
{"fields": [{"default": null, "type": ["null", {"logicalType": "timestamp-millis", "type": "long"}], "name": "_col0"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col1"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col2"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col3"}, {"default": null, "type": ["null", "long"], "name": "_col4"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col5"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 10}], "name": "_col6"}, {"default": null, "type": ["null", "double"], "name": "_col7"}, {"default": null, "type": ["null", "long"], "name": "_col8"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col9"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col10"}], "type": "record", "name": "baseRecord"}
代码:
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
schema = avro.schema.parse(open("avro.avsc", "rb").read())
# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema)
错误表明您的代码没有将所有必需的参数传递给 WriteToAvro()
转换的构造函数 - 事实上它至少需要 2 个参数(文件名前缀和架构),但此代码仅传递1(文件名前缀)。
WriteToAvro
当前需要模式:它不是可选参数,并且没有避免指定它的解决方法。原因是 Avro 文件通常需要在创建文件之前提前了解架构,因此 WriteToAvro
也需要了解架构。
此外,我们无法从 ReadFromAvro
返回的集合中明确地推断出架构:假设用户作为 --input
传递了一个文件模式,该文件模式与具有多个不同架构的 Avro 文件相匹配 - 这些架构中的哪一个WriteToAvro
那么必须使用吗?
问题是数据管道实际上没有被执行。我设法修好了。解决方案是您需要 运行 以下两个选项之一中的光束管道:
选项 1:
p = beam.Pipeline(options=pipeline_options)
schema = avro.schema.parse(open("avro.avsc", "rb").read())
records = p | 'Read from Avro' >> ReadFromAvro(known_args.input)
# Write the file
records | 'Write to Avro' >> WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')
# Run the pipeline
result = p.run()
result.wait_until_finish()
选项 2:
使用 python with
关键字执行管道:
schema = avro.schema.parse(open("avro.avsc", "rb").read())
with beam.Pipeline(options=pipeline_options) as p:
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')
问题: 我正在尝试创建一个云数据流管道,使用 Python SDK 从 Google 云存储读取 Avro 文件,进行一些处理并在 Google 云存储上写回 Avro 文件。在查看 Apache Beam 网站上提供的一些示例后,我尝试了 运行 以下代码。我使用了 ReadFromAvro
和 WriteToAvro
函数。我想要实现的是只读取一个 Avro 文件并使用 Dataflow 写入相同的 Avro 文件但它给了我以下警告并且不输出 avro 文件。
Warning/Error:
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:121: DeprecationWarning: object() takes no parameters
super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.31790304184 seconds
Traceback (most recent call last):
File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 52, in <module>
run()
File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 47, in run
records | WriteToAvro(known_args.output)
TypeError: __init__() takes at least 3 arguments (2 given)
代码:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET/000000_0.avro',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://BUCKET/',
#required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# run your pipeline on the Google Cloud Dataflow Service.
'--runner=DataflowRunner',
# CHANGE 3/5: Your project ID is required in order to run your pipeline on
# the Google Cloud Dataflow Service.
'--project=PROJECT_NAME',
# CHANGE 4/5: Your Google Cloud Storage path is required for staging local
# files.
'--staging_location=gs://BUCKET/staging',
# CHANGE 5/5: Your Google Cloud Storage path is required for temporary
# files.
'--temp_location=gs://BUCKET/temp',
'--job_name=parse-avro',
])
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
编辑:
我尝试将架构添加到 WriteToAvro
函数,但现在出现以下错误:
错误:
/usr/local/bin/python /Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.
warnings.warn('Using fallback coder for typehint: %r.' % typehint)
架构:
{"fields": [{"default": null, "type": ["null", {"logicalType": "timestamp-millis", "type": "long"}], "name": "_col0"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col1"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col2"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col3"}, {"default": null, "type": ["null", "long"], "name": "_col4"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col5"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 10}], "name": "_col6"}, {"default": null, "type": ["null", "double"], "name": "_col7"}, {"default": null, "type": ["null", "long"], "name": "_col8"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col9"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col10"}], "type": "record", "name": "baseRecord"}
代码:
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
schema = avro.schema.parse(open("avro.avsc", "rb").read())
# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema)
错误表明您的代码没有将所有必需的参数传递给 WriteToAvro()
转换的构造函数 - 事实上它至少需要 2 个参数(文件名前缀和架构),但此代码仅传递1(文件名前缀)。
WriteToAvro
当前需要模式:它不是可选参数,并且没有避免指定它的解决方法。原因是 Avro 文件通常需要在创建文件之前提前了解架构,因此 WriteToAvro
也需要了解架构。
此外,我们无法从 ReadFromAvro
返回的集合中明确地推断出架构:假设用户作为 --input
传递了一个文件模式,该文件模式与具有多个不同架构的 Avro 文件相匹配 - 这些架构中的哪一个WriteToAvro
那么必须使用吗?
问题是数据管道实际上没有被执行。我设法修好了。解决方案是您需要 运行 以下两个选项之一中的光束管道:
选项 1:
p = beam.Pipeline(options=pipeline_options)
schema = avro.schema.parse(open("avro.avsc", "rb").read())
records = p | 'Read from Avro' >> ReadFromAvro(known_args.input)
# Write the file
records | 'Write to Avro' >> WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')
# Run the pipeline
result = p.run()
result.wait_until_finish()
选项 2:
使用 python with
关键字执行管道:
schema = avro.schema.parse(open("avro.avsc", "rb").read())
with beam.Pipeline(options=pipeline_options) as p:
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')