如何将每个标记的输出写入 Apache Beam 中的不同文件

How to write each tagged output to different file in Apache beam

我有这段代码,它根据输入文件的一些数据标记输出:

class filters(beam.DoFn):
 def process(self, element): 
    data = json.loads(element)
    yield TaggedOutput(data['EventName'],element)

我需要有关编写结果标记输出的下一步的帮助:

tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')

所以你可以看到我什么时候做 '.with_outputs()' 我不知道标签的数量和名称是什么所以我无法预测这样的事情:

tag1 = tagged.tag1

感谢您的帮助

更新:这不会工作,因为 with.outputs() 是空的

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs()

for tag in tagged_data:
    print('something')

output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args

但这会起作用

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs('tag1','tag2')

for tag in tagged_data:
    print('something')

output:
  something
  something

要实现您想要实现的目标,您需要创建一个 DoFn。您可以使用此示例作为基础:

from apache_beam.io.textio import _TextSink


class WriteEachKeyToText(beam.DoFn):
    def __init__(self, file_path_prefix=str):
        super().__init__()
        self.file_path_prefix = file_path_prefix

    def process(self, kv):
        key = kv[0]
        elements = kv[1]
        sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")

        writer = sink.open_writer("prefix", self.file_path_prefix)
        for e in elements:  # values
            writer.write(e)

那么,你可以这样使用它:

output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))

Apache Beam 管道执行被推迟——要执行的操作的 DAG 已经建立,在您 run 您的管道之前什么都不会发生。 (在 Beam Python 中,这通常在 with beam.Pipeline(...) 块的末尾隐式调用。)。 PCollections 实际上并不包含数据,只是关于如何计算数据的说明。

特别是,这意味着当你写

tagged = lines | beam.ParDo(filters()).with_outputs(...)

tagged 实际上不包含任何数据,而是包含对将生成的 PCollections 的引用(并且可以向它们添加进一步的处理步骤)。 lines 中的数据 尚未实际计算或读取 因此您无法(在管道构造期间)弄清楚输出集是什么。

不清楚问题的最终目标是什么,但如果您尝试对输出进行分区,则可能需要查看 dynamic destinations.