Google Cloud Dataflow 从字典写入 CSV

Google Cloud Dataflow Write to CSV from dictionary

我有一个值字典,我想使用 Python SDK 将其作为有效的 .CSV 文件写入 GCS。我可以将字典写成换行符分隔的文本文件,但我似乎找不到将字典转换为有效 .CSV 的示例。有人可以建议在数据流管道中生成 csv 的最佳方法吗?这回答了这个 question 地址从 CSV 文件读取,但并不真正解决写入 CSV 文件的问题。我知道 CSV 文件只是带有规则的文本文件,但我仍在努力将数据字典转换为可以使用 WriteToText 写入的 CSV。

这是一个简单的示例字典,我想将其转换为 CSV:

test_input = [{'label': 1, 'text': 'Here is a sentence'},
              {'label': 2, 'text': 'Another sentence goes here'}]


test_input  | beam.io.WriteToText(path_to_gcs)

以上将生成一个文本文件,每个字典都在换行符上。 Apache Beam 中是否有任何我可以利用的功能(类似于 csv.DictWriter)?

通常您会想要编写一个函数,将您的原始 dict 数据元素转换为 csv 格式的 string 表示形式。

该函数可以写成 DoFn,您可以将其应用于您的 Beam PCollection 数据,这会将每个集合元素转换为所需的格式;您可以通过 ParDoDoFn 应用到您的 PCollection 来做到这一点。您还可以将此 DoFn 包装在更用户友好的 PTransform.

您可以在 Beam Programming Guide

中了解有关此过程的更多信息

这是一个简单的、可翻译的非 Beam 示例:

# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
             {'label': 2, 'text': 'Another sentence goes here'}]

def convert_my_dict_to_csv_record(input_dict):
    """ Turns dictionary values into a comma-separated value formatted string """
    return ','.join(map(str, input_dict.values()))

# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]

converted_test_input 将如下所示:

['Here is a sentence,1', 'Another sentence goes here,2']

使用 DictWriter

的 Beam DictToCSV DoFn 和 PTransform 示例
from csv import DictWriter
from csv import excel
from cStringIO import StringIO

...

def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
    """ Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
        Note: This implementation does not support unicode
    """

    buf = StringIO()

    writer = DictWriter(buf,
                        fieldnames=column_order,
                        restval=missing_val,
                        extrasaction=('ignore' if discard_extras else 'raise'),
                        dialect=dialect)
    writer.writerow(element)

    return buf.getvalue().rstrip(dialect.lineterminator)


class _DictToCSVFn(DoFn):
    """ Converts a Dictionary to a CSV-formatted String

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in the input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def process(self, element, *args, **kwargs):
        result = _dict_to_csv(element,
                              column_order=self._column_order,
                              missing_val=self._missing_val,
                              discard_extras=self._discard_extras,
                              dialect=self._dialect)

        return [result,]

class DictToCSV(PTransform):
    """ Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in an input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def expand(self, pcoll):
        return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
                                          missing_val=self._missing_val,
                                          discard_extras=self._discard_extras,
                                          dialect=self._dialect)
                             )

要使用该示例,您可以将 test_input 放入 PCollection,并将 DictToCSV PTransform 应用于 PCollection;您可以将转换后的结果 PCollection 用作 WriteToText 的输入。请注意,您必须通过 column_order 参数提供与字典输入元素的键相对应的列名列表或元组;生成的 CSV 格式的字符串列将按照提供的列名的顺序排列。此外,该示例的底层实现不支持 unicode.

根据安德鲁的建议,这是我创建的 ConvertDictToCSV 函数:

def ConvertDictToCSV(input_dict, fieldnames, separator=",", quotechar='"'):
  value_list = []
  for field in fieldnames:
    if input_dict[field]:
      field_value = str(input_dict[field])
    else:
      field_value = ""
    if separator in field_value:
      field_value = quotechar + field_value + quotechar
    value_list.append(field_value)

  return separator.join(value_list)

这似乎运作良好,但如果可能,使用 csv.DictWriter 肯定会更安全