NameError: name 'beam' is not defined

NameError: name 'beam' is not defined

in get_csv_reader NameError: name 'beam' is not defined [while 运行ning 'Flatten the CSV-ptransform-73']

我正在尝试在 Apache Beam 上读取 CSV 并将其加载到 BQ。我使用 CSV (headers) 中的第一行将所有行转换为字典,然后再将其加载到 BQ。

下面是我的流水线代码

#pipeline
def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()



    parser.add_argument(
        "--output_table",
        help="Output BigQuery table for results specified as: "
             "PROJECT:DATASET.TABLE or DATASET.TABLE.",
    )
    parser.add_argument(
        "--input_file",
        help="file location of input data "
             '"GCS path"',
    )

    known_args, pipeline_args = parser.parse_known_args(argv)
    input_file = known_args.input_file
    output_table = known_args.output_table


    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # headers = ['id', '_title12764THE_', 'type', 'description', 'release_year', 'age_certification', 'runtime', 'genres', 'production_countries', 'seasons', 'imdb_id', 'imdb_score', 'imdb_votes', 'tmdb_popularity', 'tmdb_score']
    headers= read_headers(input_file)
    with beam.Pipeline(options=pipeline_options) as p:

    # Create Pipeline (PCollections)
        parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
        flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))
        convert = flattened_file | "bq convert" >> beam.Map(lambda x: parse_csv(x,headers))
        convert | 'Write to bq' >> beam.io.WriteToBigQuery(
            output_table,
            schema='SCHEMA_AUTODETECT',
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)   

阅读方法 headers:

#reading headers
def read_headers(csv_file):
  fs = gcsfs.GCSFileSystem(project='hidden-mapper-351214')
  with fs.open(csv_file,'r') as f:
    header_line = f.readline().strip()
  headers = next(csv.reader([header_line]))

  # BQ Column name requirements

  BQ_headers=[]
  for head in headers:
      BQ_headers.append(re.sub('\W+', '', head.lstrip('0123456789')))

  return BQ_headers

将CSV读入P-collection的方法:

def get_csv_reader(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read file as a CSV
    gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))

    # next(gcs_reader)

    return gcs_reader

当我 运行 代码时,出现以下错误:

File "/home/akhil_kakumanu/ingest-demo/ingest-csv.py", line 32, in get_csv_reader
NameError: name 'beam' is not defined [while running 'Flatten the CSV-ptransform-73']

我的第32行是这个

gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

当我 hard-coded 列表中的 headers 时,我的代码最初运行良好。但是当我使用 GCSFS 时,它开始抛出该错误。我认为它必须用两个不同的库打开同一个文件,但不确定。我也安装了 beam SDK 和 GCSFS。就像,我说当我硬编码 headers.

时它工作正常

谁能告诉我为什么会这样以及如何解决这个问题?另外,如果有任何其他有效的方法来读取任何 CSV 并将其推送到 BQ,请提出建议。

你可以尝试添加

import apache_beam as beam

内部函数 def get_csv_reader?

NameError 的问题通常是工作人员不知道来自全局命名空间的值。参见 https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors