Apache-Beam + Python:将 JSON(或字典)字符串写入输出文件

Apache-Beam + Python: Writing JSON (or dictionaries) strings to output file

我正在尝试使用 Beam 管道,以便将 SequenceMatcher 函数应用于大量单词。我(希望)已经弄清楚了除了 WriteToText 部分之外的所有内容。

我已经定义了一个自定义的 ParDo(这里称为 ProcessDataDoFn),它接受 main_input 和 side_input,处理它们并像这样输出字典

{u'key': (u'string', float)}

我的管道很简单

class ProcessDataDoFn(beam.DoFn):
    def process(self, element, side_input):

    ... Series of operations ...

    return output_dictionary

with beam.Pipeline(options=options) as p:

    # Main input
    main_input = p | 'ReadMainInput' >> beam.io.Read(
        beam.io.BigQuerySource(
            query=CUSTOM_SQL,
            use_standard_sql=True
        ))

    # Side input
    side_input = p | 'ReadSideInput' >> beam.io.Read(
        beam.io.BigQuerySource(
            project=PROJECT_ID,
            dataset=DATASET,
            table=TABLE
        ))

    output = (
        main_input
        | 'ProcessData' >> beam.ParDo(
            ProcessDataDoFn(),
            side_input=beam.pvalue.AsList(side_input))
        | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
    )

现在的问题是,如果我这样离开管道,它只会输出 output_dictionary 的密钥。如果我将 ProcessDataDoFn 的 return 更改为 json.dumps(ouput_dictionary),则 Json 的写法是正确的,但像这样

{
'
k
e
y
'

:

[
'
s
t
r
i
n
g
'

,

f
l
o
a
t
]

怎样才能正确输出结果?

你的输出看起来像这样很不寻常。 json.dumps 应该在一行中打印 json,并且应该逐行输出到文件。

也许为了拥有更简洁的代码,您可以添加一个额外的地图操作,以按照您需要的方式进行格式化。像这样:

output = (
  main_input
  | 'ProcessData' >> beam.ParDo(
        ProcessDataDoFn(),
        side_input=beam.pvalue.AsList(side_input))
  | 'FormatOutput' >> beam.Map(json.dumps)
  | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)

我实际上部分解决了这个问题。

我编写的 ParDoFn return 字典或 JSON 格式化字符串。在这两种情况下,当 Beam 尝试对所述输入做某事时,问题就会出现。如果所说的 PCollection 是一个字典,Beam 似乎会遍历给定的 PCollection,它只获取它的键,如果所说的 PCollection 是一个字符串,它会遍历所有字符(这就是 JSON 输出如此奇怪的原因)。我发现解决方案相当简单:将字典或字符串封装在列表中。 JSON 格式化部分可以在 ParDoFn 级别完成,也可以像您展示的那样通过转换完成。