将 table 转换为字典的字典

Convert a table to dictionary of dictionaries

我正在使用 Apache Beam/DataFlow 并且我正在创建一个自定义函数作为管道的一部分。管道从 BigQuery 获取数据(第一步)。第二步需要对 PCollection 进行转换。此函数在 CSV 上运行良好 - 从 CSV 中的 table 创建字典字典,但我不知道如何将其作为管道的一部分,数据源是一个大查询 table.这个转换后的数据将被送入另一个需要这种格式的函数的步骤。

def preProcess(column):
    column = unidecode(column)
    column = re.sub('  +', ' ', column)
    column = re.sub('\n', ' ', column)
    column = column.strip().strip('"').strip("'").lower().strip()

    if not column:
        column = None
    return column


def readData(filename):
    data_d = {}
    with open(filename) as f:
        reader = csv.DictReader(f)
        for row in reader:
            clean_row = [(k, preProcess(v)) for (k, v) in row.items()]
            row_id = int(row['Id'])
            data_d[row_id] = dict(clean_row)

    return data_d

这里尝试编写一个函数来执行此操作:

def datatransform(source): #source is the PCollection from the readBigQuery step
    headers = {'Column1', 'Column2', Column3'}
    data = {}
    for rows in source:
        row = zip(headers,rows)
        data.append(row)
    return data

输入如下所示:

输出应该是这样的——字典的字典:

Python 的 BigQuery IO returns 您的查询结果作为行的 PCollection 作为字典,因此您不必对那里的数据做太多事情。要将所有这些元素收集到一个字典中,您可以根据您的用例使用 Combine function to collect them into one large dict. You can then consume that in the following transform as a single-element PCollection or a singleton side input