Beam Python Dataflow, writing to BigQuery table with schema provided throws AttributeError: May not assign arbitrary value tpe to message
Beam Python Dataflow, writing to BigQuery table with schema provided throws AttributeError: May not assign arbitrary value tpe to message
我正在数据流上尝试此代码。从 gs:/ 存储桶上的 csv 文件读取,创建 BigQuery table 并附加数据。代码如下:
from __future__ import absolute_import
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
import regex as re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://xxx/DUMMY.csv']
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'DUMMY',
dataset='test',
schema = {
"fields": [
{
"name" : "ID",
"type" : "FLOAT",
"mode" : "REQUIRED"
},
{
"name" : "CLUSTERED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "SCATTERED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "RANDOMISED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "RANDOM_STRING",
"tpe" : "STRING",
"mode" : "NULLABLE"
},
{
"name" : "SMALL_VC",
"tpe" : "INTEGER",
"mode" : "NULLABLE"
},
{
"name" : "PADDING",
"tpe" : "STRING",
"mode" : "NULLABLE"
}
]
},
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
代码抛出这个错误
File "/usr/local/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 976, in __setattr__
"to message %s" % (name, type(self).__name__))
AttributeError: May not assign arbitrary value tpe to message TableFieldSchema [while running 'Write to BigQuery/WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables']
如果我尝试以下操作
schema='SCHEMA_AUTODETECT'
而不是像上面那样添加 Json 架构, 它工作正常。此外,如果我尝试在 schema= 中引用 Json 文件,它会失败。其根本原因是什么?这是我的第一个 dataflow/beam 程序,感谢任何建议。
这是因为 schema
接受形式为 'field1:type1,field2:type2,field3:type3'
或 TableSchema
对象的类型字符串,该对象定义了逗号分隔的字段列表。您可以参考此文档:https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery
from __future__ import absolute_import
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://<your_bucket>/1.csv']
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
pipeline_options = PipelineOptions(
runner='DirectRunner',
project='your-project',
temp_location='gs://<your_bucket>/temp',
region='us-central1')
p = beam.Pipeline(options=pipeline_options)
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'your_table',
dataset='your_dataset',
schema ='ID:FLOAT,CLUSTERED:FLOAT,SCATTERED:FLOAT,RANDOMISED:FLOAT,RANDOM_STRING:STRING,SMALL_VC:INTEGER,PADDING:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
输入文件 csv:
ID,CLUSTERED,SCATTERED,RANDOMISED,RANDOM_STRING,SMALL_VC,PADDING
12,5,23,55,team,5,bigdata
输出:
我正在数据流上尝试此代码。从 gs:/ 存储桶上的 csv 文件读取,创建 BigQuery table 并附加数据。代码如下:
from __future__ import absolute_import
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
import regex as re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://xxx/DUMMY.csv']
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'DUMMY',
dataset='test',
schema = {
"fields": [
{
"name" : "ID",
"type" : "FLOAT",
"mode" : "REQUIRED"
},
{
"name" : "CLUSTERED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "SCATTERED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "RANDOMISED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "RANDOM_STRING",
"tpe" : "STRING",
"mode" : "NULLABLE"
},
{
"name" : "SMALL_VC",
"tpe" : "INTEGER",
"mode" : "NULLABLE"
},
{
"name" : "PADDING",
"tpe" : "STRING",
"mode" : "NULLABLE"
}
]
},
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
代码抛出这个错误
File "/usr/local/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 976, in __setattr__
"to message %s" % (name, type(self).__name__))
AttributeError: May not assign arbitrary value tpe to message TableFieldSchema [while running 'Write to BigQuery/WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables']
如果我尝试以下操作
schema='SCHEMA_AUTODETECT'
而不是像上面那样添加 Json 架构, 它工作正常。此外,如果我尝试在 schema= 中引用 Json 文件,它会失败。其根本原因是什么?这是我的第一个 dataflow/beam 程序,感谢任何建议。
这是因为 schema
接受形式为 'field1:type1,field2:type2,field3:type3'
或 TableSchema
对象的类型字符串,该对象定义了逗号分隔的字段列表。您可以参考此文档:https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery
from __future__ import absolute_import
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://<your_bucket>/1.csv']
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
pipeline_options = PipelineOptions(
runner='DirectRunner',
project='your-project',
temp_location='gs://<your_bucket>/temp',
region='us-central1')
p = beam.Pipeline(options=pipeline_options)
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'your_table',
dataset='your_dataset',
schema ='ID:FLOAT,CLUSTERED:FLOAT,SCATTERED:FLOAT,RANDOMISED:FLOAT,RANDOM_STRING:STRING,SMALL_VC:INTEGER,PADDING:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
输入文件 csv:
ID,CLUSTERED,SCATTERED,RANDOMISED,RANDOM_STRING,SMALL_VC,PADDING
12,5,23,55,team,5,bigdata
输出: