将 table 转换为字典的字典
Convert a table to dictionary of dictionaries
我正在使用 Apache Beam/DataFlow 并且我正在创建一个自定义函数作为管道的一部分。管道从 BigQuery 获取数据(第一步)。第二步需要对 PCollection 进行转换。此函数在 CSV 上运行良好 - 从 CSV 中的 table 创建字典字典,但我不知道如何将其作为管道的一部分,数据源是一个大查询 table.这个转换后的数据将被送入另一个需要这种格式的函数的步骤。
def preProcess(column):
column = unidecode(column)
column = re.sub(' +', ' ', column)
column = re.sub('\n', ' ', column)
column = column.strip().strip('"').strip("'").lower().strip()
if not column:
column = None
return column
def readData(filename):
data_d = {}
with open(filename) as f:
reader = csv.DictReader(f)
for row in reader:
clean_row = [(k, preProcess(v)) for (k, v) in row.items()]
row_id = int(row['Id'])
data_d[row_id] = dict(clean_row)
return data_d
这里尝试编写一个函数来执行此操作:
def datatransform(source): #source is the PCollection from the readBigQuery step
headers = {'Column1', 'Column2', Column3'}
data = {}
for rows in source:
row = zip(headers,rows)
data.append(row)
return data
输入如下所示:
输出应该是这样的——字典的字典:
Python 的 BigQuery IO returns 您的查询结果作为行的 PCollection 作为字典,因此您不必对那里的数据做太多事情。要将所有这些元素收集到一个字典中,您可以根据您的用例使用 Combine function to collect them into one large dict. You can then consume that in the following transform as a single-element PCollection or a singleton side input。
我正在使用 Apache Beam/DataFlow 并且我正在创建一个自定义函数作为管道的一部分。管道从 BigQuery 获取数据(第一步)。第二步需要对 PCollection 进行转换。此函数在 CSV 上运行良好 - 从 CSV 中的 table 创建字典字典,但我不知道如何将其作为管道的一部分,数据源是一个大查询 table.这个转换后的数据将被送入另一个需要这种格式的函数的步骤。
def preProcess(column):
column = unidecode(column)
column = re.sub(' +', ' ', column)
column = re.sub('\n', ' ', column)
column = column.strip().strip('"').strip("'").lower().strip()
if not column:
column = None
return column
def readData(filename):
data_d = {}
with open(filename) as f:
reader = csv.DictReader(f)
for row in reader:
clean_row = [(k, preProcess(v)) for (k, v) in row.items()]
row_id = int(row['Id'])
data_d[row_id] = dict(clean_row)
return data_d
这里尝试编写一个函数来执行此操作:
def datatransform(source): #source is the PCollection from the readBigQuery step
headers = {'Column1', 'Column2', Column3'}
data = {}
for rows in source:
row = zip(headers,rows)
data.append(row)
return data
输入如下所示:
输出应该是这样的——字典的字典:
Python 的 BigQuery IO returns 您的查询结果作为行的 PCollection 作为字典,因此您不必对那里的数据做太多事情。要将所有这些元素收集到一个字典中,您可以根据您的用例使用 Combine function to collect them into one large dict. You can then consume that in the following transform as a single-element PCollection or a singleton side input。