WriteToText 仅写入临时文件

WriteToText is only writing to temp files

我是 Apache Beam 的新手,并试图在 Python 中编写我的第一个管道以从 Google Pub/Sub 订阅平面文件输出数据供以后使用;理想情况下,我想每半小时将它们打包到一个文件中。我的管道中有以下代码作为最终转换:-

| 'write output' >> WriteToText('TestNewPipeline.txt')

然而,所有创建的文件都在前缀为 "beam-temp-TestNewPipeline.txt-[somehash]" 的目录中,并以 10 个为一组进行批处理,这不是我所期望的。

我试过 window 函数,但似乎没有太大效果,所以要么我完全误解了这个概念,要么做的事情完全错误。

window 的代码是:-

 | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))

我假设这会导致文本文件的输出在静态的五秒内写入 window,但事实并非如此。

完整代码如下:-

options = PipelineOptions()
options.view_as(StandardOptions).streaming=True

def format_message(message, timestamp=beam.DoFn.TimestampParam):    
    formatted_message = {
        'data': message.data,
        'attributes': str(message.attributes),
        'timestamp': float(timestamp)
    }

    return formatted_message

with beam.Pipeline(options=options) as p:
    (p
    | 'Read From Pub Sub' >> ReadFromPubSub(subscription='projects/[my proj]/subscriptions/[my subscription]',with_attributes=True)
    | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
    | 'Map Message' >> beam.Map(format_message)
    | 'write output' >> WriteToText('TestNewPipeline.txt')
    )
result = p.run()

不出所料,进程无限期运行并成功从订阅中读取消息;但是它只将它们写入光束临时文件。有谁能帮我指出我错在哪里吗?

更新:

根据 Jason 的评论,我对管道做了一些修改:-

class AddKeyToDict(beam.DoFn):
    def process(self, element):
        return [(element['rownumber'], element)]

    with beam.Pipeline(options=options) as p:
        (p
        | 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription)# can't make attributes work as yet! ,with_attributes=True) 
        # failed attempt 1| 'Map Message' >> beam.Map(format_message)
        # failed attempt 2| 'Parse JSON' >> beam.Map(format_message_element)
        | 'Parse to Json' >> beam.Map(lambda x: json.loads(x))
        | 'Add key' >> beam.ParDo(AddKeyToDict())
        | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5), trigger=AfterProcessingTime(15), accumulation_mode=AccumulationMode.DISCARDING)
        | 'Group' >> beam.GroupByKey()
        | 'write output' >> WriteToText(known_args.output_file)
        )

我还不能从 PubSub 中提取 message_id 或发布时间,所以我只是使用在我的消息中生成的行号。在这一点上,我仍然只是创建了临时文件,没有任何东西积累到最终文件中?开始怀疑 Python 实现是否仍然有点缺乏,我将不得不拿起 Java...

来自Apache Beam's documentation on Windowing Constraints

If you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until GroupByKey or Combine aggregates across a window and key.

因为在这个例子中似乎没有键的概念,你可以尝试使用Combine吗?

通过与 Apache Beam Python 的合作,Python 尚不支持对 GCS(或本地文件系统)的流式写入,因此不会发生流式写入;目前仅支持无限目标(例如 Big Query 表)。

显然这将在即将发布的 Python v2.14.0.

Beam 版本中得到支持