NameError: name 'beam' is not defined
NameError: name 'beam' is not defined
in get_csv_reader NameError: name 'beam' is not defined [while 运行ning 'Flatten the CSV-ptransform-73']
我正在尝试在 Apache Beam 上读取 CSV 并将其加载到 BQ。我使用 CSV (headers) 中的第一行将所有行转换为字典,然后再将其加载到 BQ。
下面是我的流水线代码
#pipeline
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
"--output_table",
help="Output BigQuery table for results specified as: "
"PROJECT:DATASET.TABLE or DATASET.TABLE.",
)
parser.add_argument(
"--input_file",
help="file location of input data "
'"GCS path"',
)
known_args, pipeline_args = parser.parse_known_args(argv)
input_file = known_args.input_file
output_table = known_args.output_table
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# headers = ['id', '_title12764THE_', 'type', 'description', 'release_year', 'age_certification', 'runtime', 'genres', 'production_countries', 'seasons', 'imdb_id', 'imdb_score', 'imdb_votes', 'tmdb_popularity', 'tmdb_score']
headers= read_headers(input_file)
with beam.Pipeline(options=pipeline_options) as p:
# Create Pipeline (PCollections)
parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))
convert = flattened_file | "bq convert" >> beam.Map(lambda x: parse_csv(x,headers))
convert | 'Write to bq' >> beam.io.WriteToBigQuery(
output_table,
schema='SCHEMA_AUTODETECT',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
阅读方法 headers:
#reading headers
def read_headers(csv_file):
fs = gcsfs.GCSFileSystem(project='hidden-mapper-351214')
with fs.open(csv_file,'r') as f:
header_line = f.readline().strip()
headers = next(csv.reader([header_line]))
# BQ Column name requirements
BQ_headers=[]
for head in headers:
BQ_headers.append(re.sub('\W+', '', head.lstrip('0123456789')))
return BQ_headers
将CSV读入P-collection的方法:
def get_csv_reader(readable_file):
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read file as a CSV
gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))
# next(gcs_reader)
return gcs_reader
当我 运行 代码时,出现以下错误:
File "/home/akhil_kakumanu/ingest-demo/ingest-csv.py", line 32, in get_csv_reader
NameError: name 'beam' is not defined [while running 'Flatten the CSV-ptransform-73']
我的第32行是这个
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
当我 hard-coded 列表中的 headers 时,我的代码最初运行良好。但是当我使用 GCSFS 时,它开始抛出该错误。我认为它必须用两个不同的库打开同一个文件,但不确定。我也安装了 beam SDK 和 GCSFS。就像,我说当我硬编码 headers.
时它工作正常
谁能告诉我为什么会这样以及如何解决这个问题?另外,如果有任何其他有效的方法来读取任何 CSV 并将其推送到 BQ,请提出建议。
你可以尝试添加
import apache_beam as beam
内部函数 def get_csv_reader
?
NameError 的问题通常是工作人员不知道来自全局命名空间的值。参见 https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors。
in get_csv_reader NameError: name 'beam' is not defined [while 运行ning 'Flatten the CSV-ptransform-73']
我正在尝试在 Apache Beam 上读取 CSV 并将其加载到 BQ。我使用 CSV (headers) 中的第一行将所有行转换为字典,然后再将其加载到 BQ。
下面是我的流水线代码
#pipeline
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
"--output_table",
help="Output BigQuery table for results specified as: "
"PROJECT:DATASET.TABLE or DATASET.TABLE.",
)
parser.add_argument(
"--input_file",
help="file location of input data "
'"GCS path"',
)
known_args, pipeline_args = parser.parse_known_args(argv)
input_file = known_args.input_file
output_table = known_args.output_table
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# headers = ['id', '_title12764THE_', 'type', 'description', 'release_year', 'age_certification', 'runtime', 'genres', 'production_countries', 'seasons', 'imdb_id', 'imdb_score', 'imdb_votes', 'tmdb_popularity', 'tmdb_score']
headers= read_headers(input_file)
with beam.Pipeline(options=pipeline_options) as p:
# Create Pipeline (PCollections)
parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))
convert = flattened_file | "bq convert" >> beam.Map(lambda x: parse_csv(x,headers))
convert | 'Write to bq' >> beam.io.WriteToBigQuery(
output_table,
schema='SCHEMA_AUTODETECT',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
阅读方法 headers:
#reading headers
def read_headers(csv_file):
fs = gcsfs.GCSFileSystem(project='hidden-mapper-351214')
with fs.open(csv_file,'r') as f:
header_line = f.readline().strip()
headers = next(csv.reader([header_line]))
# BQ Column name requirements
BQ_headers=[]
for head in headers:
BQ_headers.append(re.sub('\W+', '', head.lstrip('0123456789')))
return BQ_headers
将CSV读入P-collection的方法:
def get_csv_reader(readable_file):
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read file as a CSV
gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))
# next(gcs_reader)
return gcs_reader
当我 运行 代码时,出现以下错误:
File "/home/akhil_kakumanu/ingest-demo/ingest-csv.py", line 32, in get_csv_reader
NameError: name 'beam' is not defined [while running 'Flatten the CSV-ptransform-73']
我的第32行是这个
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
当我 hard-coded 列表中的 headers 时,我的代码最初运行良好。但是当我使用 GCSFS 时,它开始抛出该错误。我认为它必须用两个不同的库打开同一个文件,但不确定。我也安装了 beam SDK 和 GCSFS。就像,我说当我硬编码 headers.
时它工作正常谁能告诉我为什么会这样以及如何解决这个问题?另外,如果有任何其他有效的方法来读取任何 CSV 并将其推送到 BQ,请提出建议。
你可以尝试添加
import apache_beam as beam
内部函数 def get_csv_reader
?
NameError 的问题通常是工作人员不知道来自全局命名空间的值。参见 https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors。