如何将每个标记的输出写入 Apache Beam 中的不同文件
How to write each tagged output to different file in Apache beam
我有这段代码,它根据输入文件的一些数据标记输出:
class filters(beam.DoFn):
def process(self, element):
data = json.loads(element)
yield TaggedOutput(data['EventName'],element)
我需要有关编写结果标记输出的下一步的帮助:
tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')
所以你可以看到我什么时候做 '.with_outputs()' 我不知道标签的数量和名称是什么所以我无法预测这样的事情:
tag1 = tagged.tag1
感谢您的帮助
更新:这不会工作,因为 with.outputs() 是空的
tagged_data= lines | 'tagged data by key' >>
beam.ParDo(filters()).with_outputs()
for tag in tagged_data:
print('something')
output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args
但这会起作用
tagged_data= lines | 'tagged data by key' >>
beam.ParDo(filters()).with_outputs('tag1','tag2')
for tag in tagged_data:
print('something')
output:
something
something
要实现您想要实现的目标,您需要创建一个 DoFn。您可以使用此示例作为基础:
from apache_beam.io.textio import _TextSink
class WriteEachKeyToText(beam.DoFn):
def __init__(self, file_path_prefix=str):
super().__init__()
self.file_path_prefix = file_path_prefix
def process(self, kv):
key = kv[0]
elements = kv[1]
sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")
writer = sink.open_writer("prefix", self.file_path_prefix)
for e in elements: # values
writer.write(e)
那么,你可以这样使用它:
output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))
Apache Beam 管道执行被推迟——要执行的操作的 DAG 已经建立,在您 run 您的管道之前什么都不会发生。 (在 Beam Python 中,这通常在 with beam.Pipeline(...)
块的末尾隐式调用。)。 PCollections 实际上并不包含数据,只是关于如何计算数据的说明。
特别是,这意味着当你写
tagged = lines | beam.ParDo(filters()).with_outputs(...)
tagged 实际上不包含任何数据,而是包含对将生成的 PCollections 的引用(并且可以向它们添加进一步的处理步骤)。 lines
中的数据 尚未实际计算或读取 因此您无法(在管道构造期间)弄清楚输出集是什么。
不清楚问题的最终目标是什么,但如果您尝试对输出进行分区,则可能需要查看 dynamic destinations.
我有这段代码,它根据输入文件的一些数据标记输出:
class filters(beam.DoFn):
def process(self, element):
data = json.loads(element)
yield TaggedOutput(data['EventName'],element)
我需要有关编写结果标记输出的下一步的帮助:
tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')
所以你可以看到我什么时候做 '.with_outputs()' 我不知道标签的数量和名称是什么所以我无法预测这样的事情:
tag1 = tagged.tag1
感谢您的帮助
更新:这不会工作,因为 with.outputs() 是空的
tagged_data= lines | 'tagged data by key' >>
beam.ParDo(filters()).with_outputs()
for tag in tagged_data:
print('something')
output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args
但这会起作用
tagged_data= lines | 'tagged data by key' >>
beam.ParDo(filters()).with_outputs('tag1','tag2')
for tag in tagged_data:
print('something')
output:
something
something
要实现您想要实现的目标,您需要创建一个 DoFn。您可以使用此示例作为基础:
from apache_beam.io.textio import _TextSink
class WriteEachKeyToText(beam.DoFn):
def __init__(self, file_path_prefix=str):
super().__init__()
self.file_path_prefix = file_path_prefix
def process(self, kv):
key = kv[0]
elements = kv[1]
sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")
writer = sink.open_writer("prefix", self.file_path_prefix)
for e in elements: # values
writer.write(e)
那么,你可以这样使用它:
output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))
Apache Beam 管道执行被推迟——要执行的操作的 DAG 已经建立,在您 run 您的管道之前什么都不会发生。 (在 Beam Python 中,这通常在 with beam.Pipeline(...)
块的末尾隐式调用。)。 PCollections 实际上并不包含数据,只是关于如何计算数据的说明。
特别是,这意味着当你写
tagged = lines | beam.ParDo(filters()).with_outputs(...)
tagged 实际上不包含任何数据,而是包含对将生成的 PCollections 的引用(并且可以向它们添加进一步的处理步骤)。 lines
中的数据 尚未实际计算或读取 因此您无法(在管道构造期间)弄清楚输出集是什么。
不清楚问题的最终目标是什么,但如果您尝试对输出进行分区,则可能需要查看 dynamic destinations.