WriteToText 仅写入临时文件
WriteToText is only writing to temp files
我是 Apache Beam 的新手,并试图在 Python 中编写我的第一个管道以从 Google Pub/Sub 订阅平面文件输出数据供以后使用;理想情况下,我想每半小时将它们打包到一个文件中。我的管道中有以下代码作为最终转换:-
| 'write output' >> WriteToText('TestNewPipeline.txt')
然而,所有创建的文件都在前缀为 "beam-temp-TestNewPipeline.txt-[somehash]" 的目录中,并以 10 个为一组进行批处理,这不是我所期望的。
我试过 window 函数,但似乎没有太大效果,所以要么我完全误解了这个概念,要么做的事情完全错误。
window 的代码是:-
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
我假设这会导致文本文件的输出在静态的五秒内写入 window,但事实并非如此。
完整代码如下:-
options = PipelineOptions()
options.view_as(StandardOptions).streaming=True
def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': message.data,
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}
return formatted_message
with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription='projects/[my proj]/subscriptions/[my subscription]',with_attributes=True)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
| 'Map Message' >> beam.Map(format_message)
| 'write output' >> WriteToText('TestNewPipeline.txt')
)
result = p.run()
不出所料,进程无限期运行并成功从订阅中读取消息;但是它只将它们写入光束临时文件。有谁能帮我指出我错在哪里吗?
更新:
根据 Jason 的评论,我对管道做了一些修改:-
class AddKeyToDict(beam.DoFn):
def process(self, element):
return [(element['rownumber'], element)]
with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription)# can't make attributes work as yet! ,with_attributes=True)
# failed attempt 1| 'Map Message' >> beam.Map(format_message)
# failed attempt 2| 'Parse JSON' >> beam.Map(format_message_element)
| 'Parse to Json' >> beam.Map(lambda x: json.loads(x))
| 'Add key' >> beam.ParDo(AddKeyToDict())
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5), trigger=AfterProcessingTime(15), accumulation_mode=AccumulationMode.DISCARDING)
| 'Group' >> beam.GroupByKey()
| 'write output' >> WriteToText(known_args.output_file)
)
我还不能从 PubSub 中提取 message_id 或发布时间,所以我只是使用在我的消息中生成的行号。在这一点上,我仍然只是创建了临时文件,没有任何东西积累到最终文件中?开始怀疑 Python 实现是否仍然有点缺乏,我将不得不拿起 Java...
来自Apache Beam's documentation on Windowing Constraints:
If you set a windowing function using the Window
transform, each element is assigned to a window, but the windows are not considered until GroupByKey
or Combine
aggregates across a window and key.
因为在这个例子中似乎没有键的概念,你可以尝试使用Combine
吗?
通过与 Apache Beam Python 的合作,Python 尚不支持对 GCS(或本地文件系统)的流式写入,因此不会发生流式写入;目前仅支持无限目标(例如 Big Query 表)。
显然这将在即将发布的 Python v2.14.0.
Beam 版本中得到支持
我是 Apache Beam 的新手,并试图在 Python 中编写我的第一个管道以从 Google Pub/Sub 订阅平面文件输出数据供以后使用;理想情况下,我想每半小时将它们打包到一个文件中。我的管道中有以下代码作为最终转换:-
| 'write output' >> WriteToText('TestNewPipeline.txt')
然而,所有创建的文件都在前缀为 "beam-temp-TestNewPipeline.txt-[somehash]" 的目录中,并以 10 个为一组进行批处理,这不是我所期望的。
我试过 window 函数,但似乎没有太大效果,所以要么我完全误解了这个概念,要么做的事情完全错误。
window 的代码是:-
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
我假设这会导致文本文件的输出在静态的五秒内写入 window,但事实并非如此。
完整代码如下:-
options = PipelineOptions()
options.view_as(StandardOptions).streaming=True
def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': message.data,
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}
return formatted_message
with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription='projects/[my proj]/subscriptions/[my subscription]',with_attributes=True)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
| 'Map Message' >> beam.Map(format_message)
| 'write output' >> WriteToText('TestNewPipeline.txt')
)
result = p.run()
不出所料,进程无限期运行并成功从订阅中读取消息;但是它只将它们写入光束临时文件。有谁能帮我指出我错在哪里吗?
更新:
根据 Jason 的评论,我对管道做了一些修改:-
class AddKeyToDict(beam.DoFn):
def process(self, element):
return [(element['rownumber'], element)]
with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription)# can't make attributes work as yet! ,with_attributes=True)
# failed attempt 1| 'Map Message' >> beam.Map(format_message)
# failed attempt 2| 'Parse JSON' >> beam.Map(format_message_element)
| 'Parse to Json' >> beam.Map(lambda x: json.loads(x))
| 'Add key' >> beam.ParDo(AddKeyToDict())
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5), trigger=AfterProcessingTime(15), accumulation_mode=AccumulationMode.DISCARDING)
| 'Group' >> beam.GroupByKey()
| 'write output' >> WriteToText(known_args.output_file)
)
我还不能从 PubSub 中提取 message_id 或发布时间,所以我只是使用在我的消息中生成的行号。在这一点上,我仍然只是创建了临时文件,没有任何东西积累到最终文件中?开始怀疑 Python 实现是否仍然有点缺乏,我将不得不拿起 Java...
来自Apache Beam's documentation on Windowing Constraints:
If you set a windowing function using the
Window
transform, each element is assigned to a window, but the windows are not considered untilGroupByKey
orCombine
aggregates across a window and key.
因为在这个例子中似乎没有键的概念,你可以尝试使用Combine
吗?
通过与 Apache Beam Python 的合作,Python 尚不支持对 GCS(或本地文件系统)的流式写入,因此不会发生流式写入;目前仅支持无限目标(例如 Big Query 表)。
显然这将在即将发布的 Python v2.14.0.
Beam 版本中得到支持