Beam Python Dataflow, writing to BigQuery table with schema provided throws AttributeError: May not assign arbitrary value tpe to message

Beam Python Dataflow, writing to BigQuery table with schema provided throws AttributeError: May not assign arbitrary value tpe to message

我正在数据流上尝试此代码。从 gs:/ 存储桶上的 csv 文件读取,创建 BigQuery table 并附加数据。代码如下:

from __future__ import absolute_import

import argparse
import logging
import os

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage

import regex as re

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://xxx/DUMMY.csv']

class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'DUMMY',
    dataset='test',
    schema = {
  "fields": [
  {
    "name" : "ID",
    "type" : "FLOAT",
    "mode" : "REQUIRED"
  },
  {
    "name" : "CLUSTERED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "SCATTERED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "RANDOMISED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "RANDOM_STRING",
    "tpe"  : "STRING",
    "mode" : "NULLABLE"
  },
  {
    "name" : "SMALL_VC",
    "tpe"  : "INTEGER",
    "mode" : "NULLABLE"
  },
  {
    "name" : "PADDING",
    "tpe"  : "STRING",
    "mode" : "NULLABLE"
  }
]
},

    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

代码抛出这个错误

  File "/usr/local/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 976, in __setattr__
    "to message %s" % (name, type(self).__name__))
AttributeError: May not assign arbitrary value tpe to message TableFieldSchema [while running 'Write to BigQuery/WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables']

如果我尝试以下操作

schema='SCHEMA_AUTODETECT'

而不是像上面那样添加 Json 架构, 它工作正常。此外,如果我尝试在 schema= 中引用 Json 文件,它会失败。其根本原因是什么?这是我的第一个 dataflow/beam 程序,感谢任何建议。

这是因为 schema 接受形式为 'field1:type1,field2:type2,field3:type3'TableSchema 对象的类型字符串,该对象定义了逗号分隔的字段列表。您可以参考此文档:https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery

from __future__ import absolute_import

import argparse
import logging
import os

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


import re 

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://<your_bucket>/1.csv']

class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    
    pipeline_options = PipelineOptions(
    runner='DirectRunner',
    project='your-project',
    temp_location='gs://<your_bucket>/temp',
    region='us-central1')
    
    p = beam.Pipeline(options=pipeline_options)


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'your_table',
    dataset='your_dataset',
    schema ='ID:FLOAT,CLUSTERED:FLOAT,SCATTERED:FLOAT,RANDOMISED:FLOAT,RANDOM_STRING:STRING,SMALL_VC:INTEGER,PADDING:STRING',
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

输入文件 csv:

ID,CLUSTERED,SCATTERED,RANDOMISED,RANDOM_STRING,SMALL_VC,PADDING
12,5,23,55,team,5,bigdata

输出: