使用 Apache Beam 将 CoGroupByKey 的输出转换为行以加载到 BigQuery Python

Convert output of CoGroupByKey as row to load into BigQuery using Apache Beam Python

无法使用 Beam Python 将 CoGroupByKey 的输出转换为行以加载到 BQ 中。 尝试了很多方法都没有成功。

CoGroupByKey结果如下:

(100, ([61], [49]))
(101, ([62], [41]))

我试过的代码是:

# Get Car cust_id, income
class getKV(beam.DoFn):
    def process(self, lines):
        return [(int(lines[0]),int(lines[3]))]
# Get Car cust_id, score
class getKV2(beam.DoFn):
    def process(self, lines):
        return [(int(lines[0]),int(lines[1]))]

class BuildRowFn(beam.DoFn):
def process(self, element):
    row = {}
    (cust_id, x) = element
    tup=()
    for e in x:
        tup = tup + tuple(e)
    (cust_income, cust_score) = tup
    row['custid'] = cust_id
    row['custincome'] = cust_income
    row['custscore'] = cust_score
    print(row)
    return row

# table_schema
new_table_spec = bigquery.TableReference(
    projectId='project',
    datasetId='dataset',
    tableId='bqtable')

table_schema = 'custid:INTEGER, income:INTEGER, score:INTEGER'

# Creating a pipeline
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)

cust_info = (pipeline 
| 'Cust info' >> beam.io.ReadFromText('gs://bucket/info.csv', skip_header_lines=True)
| "Split cust info" >> beam.Map(lambda x: x.split(','))
| 'Get the (cust_id,income)' >> beam.ParDo(getKV())
)

cust_score = (pipeline 
| 'Cust score' >> beam.io.ReadFromText('gs://bucket/score.csv', skip_header_lines=True)
| "Split cust score" >> beam.Map(lambda x: x.split(','))
| 'Get the (cust_id,score)' >> beam.ParDo(getKV2())
)

custdata = (cust_info,cust_score | 'Merge' >> beam.CoGroupByKey()
)

(custdata 
| beam.ParDo(BuildRowFn())
| "WriteBQ" >> beam.io.WriteToBigQuery(
        new_table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

# Run a pipeline
result = pipeline.run() 

在 BuildRowFn 中尝试了不同的转换逻辑 class 但没有使用。 收到错误 TypeError: 'PCollection' object is not iterable 请建议我如何将其转换为行以加载到 BQ 中?

问题一:Pcollection不是Iterable的问题是括号()没放对。两个 PCollection 都应该在括号中才能加入。 我已更正为

custdata = (
    (cust_info,cust_score) 
    | 'Merge' >> beam.CoGroupByKey()   
)

问题 2:出于调试目的,我在写入 BQ 之前打印了数据。通过此打印,所有 PCollection 数据都被清空,因此没有任何内容可写入 BQ。 所以,我删除了打印语句。

成功了。