修改单个 BigQuery 列并写入新的 table
Modifying Single BigQuery column and writing to new table
我想修改 BigQuery 中的单个列并将更新后的数据写入新的 table,而无需手动保留所有其他列。我可以用下面的代码完成我想做的事情:
row = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))
new_row = row | beam.Map(lambda x: (x["col1"], x["col2"], preprocess(x["text_col"]))
output = new_row | beam.Map(lambda (col1, col2, processed_text): {"col1": col1, "col2": col2, "text": processed_text}
output | beam.io.WriteToBigQuery(path_to_new_table)
然而,这要求我基本上手写并保存每一列——如果我有 100 多列(或者实际上什至 10 多列),这将很快变得非常混乱和繁琐。有没有更简单的方法来 运行 一些函数(在本例中为 preprocess()
)在一行上并仅更新该列并仍然保留其他列?
感谢@jkff,我已经想出了如何做到这一点。该函数应该接受并接收一个字典,然后您可以只修改字典的单个元素。类似于:
new_row = row | beam.Map(lambda x: preprocess_text(x, col_to_transform='text_column')`
其中 preprocess_text() 类似于:
def preprocess_text(row, col_to_transform):
row_copy = row.copy()
line = row_copy[col_to_transform]
line = ... # preprocessing transform goes here
row_copy[col_to_transform] = line
return row_copy
我想修改 BigQuery 中的单个列并将更新后的数据写入新的 table,而无需手动保留所有其他列。我可以用下面的代码完成我想做的事情:
row = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))
new_row = row | beam.Map(lambda x: (x["col1"], x["col2"], preprocess(x["text_col"]))
output = new_row | beam.Map(lambda (col1, col2, processed_text): {"col1": col1, "col2": col2, "text": processed_text}
output | beam.io.WriteToBigQuery(path_to_new_table)
然而,这要求我基本上手写并保存每一列——如果我有 100 多列(或者实际上什至 10 多列),这将很快变得非常混乱和繁琐。有没有更简单的方法来 运行 一些函数(在本例中为 preprocess()
)在一行上并仅更新该列并仍然保留其他列?
感谢@jkff,我已经想出了如何做到这一点。该函数应该接受并接收一个字典,然后您可以只修改字典的单个元素。类似于:
new_row = row | beam.Map(lambda x: preprocess_text(x, col_to_transform='text_column')`
其中 preprocess_text() 类似于:
def preprocess_text(row, col_to_transform):
row_copy = row.copy()
line = row_copy[col_to_transform]
line = ... # preprocessing transform goes here
row_copy[col_to_transform] = line
return row_copy