使用 Apache Beam 将 CoGroupByKey 的输出转换为行以加载到 BigQuery Python
Convert output of CoGroupByKey as row to load into BigQuery using Apache Beam Python
无法使用 Beam Python 将 CoGroupByKey 的输出转换为行以加载到 BQ 中。
尝试了很多方法都没有成功。
CoGroupByKey结果如下:
(100, ([61], [49]))
(101, ([62], [41]))
我试过的代码是:
# Get Car cust_id, income
class getKV(beam.DoFn):
def process(self, lines):
return [(int(lines[0]),int(lines[3]))]
# Get Car cust_id, score
class getKV2(beam.DoFn):
def process(self, lines):
return [(int(lines[0]),int(lines[1]))]
class BuildRowFn(beam.DoFn):
def process(self, element):
row = {}
(cust_id, x) = element
tup=()
for e in x:
tup = tup + tuple(e)
(cust_income, cust_score) = tup
row['custid'] = cust_id
row['custincome'] = cust_income
row['custscore'] = cust_score
print(row)
return row
# table_schema
new_table_spec = bigquery.TableReference(
projectId='project',
datasetId='dataset',
tableId='bqtable')
table_schema = 'custid:INTEGER, income:INTEGER, score:INTEGER'
# Creating a pipeline
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
cust_info = (pipeline
| 'Cust info' >> beam.io.ReadFromText('gs://bucket/info.csv', skip_header_lines=True)
| "Split cust info" >> beam.Map(lambda x: x.split(','))
| 'Get the (cust_id,income)' >> beam.ParDo(getKV())
)
cust_score = (pipeline
| 'Cust score' >> beam.io.ReadFromText('gs://bucket/score.csv', skip_header_lines=True)
| "Split cust score" >> beam.Map(lambda x: x.split(','))
| 'Get the (cust_id,score)' >> beam.ParDo(getKV2())
)
custdata = (cust_info,cust_score | 'Merge' >> beam.CoGroupByKey()
)
(custdata
| beam.ParDo(BuildRowFn())
| "WriteBQ" >> beam.io.WriteToBigQuery(
new_table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
# Run a pipeline
result = pipeline.run()
在 BuildRowFn 中尝试了不同的转换逻辑 class 但没有使用。
收到错误 TypeError: 'PCollection' object is not iterable
请建议我如何将其转换为行以加载到 BQ 中?
问题一:Pcollection不是Iterable的问题是括号()没放对。两个 PCollection 都应该在括号中才能加入。
我已更正为
custdata = (
(cust_info,cust_score)
| 'Merge' >> beam.CoGroupByKey()
)
问题 2:出于调试目的,我在写入 BQ 之前打印了数据。通过此打印,所有 PCollection 数据都被清空,因此没有任何内容可写入 BQ。
所以,我删除了打印语句。
成功了。
无法使用 Beam Python 将 CoGroupByKey 的输出转换为行以加载到 BQ 中。 尝试了很多方法都没有成功。
CoGroupByKey结果如下:
(100, ([61], [49]))
(101, ([62], [41]))
我试过的代码是:
# Get Car cust_id, income
class getKV(beam.DoFn):
def process(self, lines):
return [(int(lines[0]),int(lines[3]))]
# Get Car cust_id, score
class getKV2(beam.DoFn):
def process(self, lines):
return [(int(lines[0]),int(lines[1]))]
class BuildRowFn(beam.DoFn):
def process(self, element):
row = {}
(cust_id, x) = element
tup=()
for e in x:
tup = tup + tuple(e)
(cust_income, cust_score) = tup
row['custid'] = cust_id
row['custincome'] = cust_income
row['custscore'] = cust_score
print(row)
return row
# table_schema
new_table_spec = bigquery.TableReference(
projectId='project',
datasetId='dataset',
tableId='bqtable')
table_schema = 'custid:INTEGER, income:INTEGER, score:INTEGER'
# Creating a pipeline
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
cust_info = (pipeline
| 'Cust info' >> beam.io.ReadFromText('gs://bucket/info.csv', skip_header_lines=True)
| "Split cust info" >> beam.Map(lambda x: x.split(','))
| 'Get the (cust_id,income)' >> beam.ParDo(getKV())
)
cust_score = (pipeline
| 'Cust score' >> beam.io.ReadFromText('gs://bucket/score.csv', skip_header_lines=True)
| "Split cust score" >> beam.Map(lambda x: x.split(','))
| 'Get the (cust_id,score)' >> beam.ParDo(getKV2())
)
custdata = (cust_info,cust_score | 'Merge' >> beam.CoGroupByKey()
)
(custdata
| beam.ParDo(BuildRowFn())
| "WriteBQ" >> beam.io.WriteToBigQuery(
new_table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
# Run a pipeline
result = pipeline.run()
在 BuildRowFn 中尝试了不同的转换逻辑 class 但没有使用。
收到错误 TypeError: 'PCollection' object is not iterable
请建议我如何将其转换为行以加载到 BQ 中?
问题一:Pcollection不是Iterable的问题是括号()没放对。两个 PCollection 都应该在括号中才能加入。 我已更正为
custdata = (
(cust_info,cust_score)
| 'Merge' >> beam.CoGroupByKey()
)
问题 2:出于调试目的,我在写入 BQ 之前打印了数据。通过此打印,所有 PCollection 数据都被清空,因此没有任何内容可写入 BQ。 所以,我删除了打印语句。
成功了。