Apache Beam 侧输入无法在使用 Python SDK 的流数据流管道中工作

Apache Beam Side Inputs not working in Streaming Dataflow Pipeline with Python SDK

我正在处理一个更大的数据流管道,它在批处理模式下工作得很好,但完成的重构确实存在边输入问题。如果我将管道置于流模式并删除侧输入,则管道在 google 的数据流上完美运行。

如果剥离所有内容并构建以下简短脚本来概括问题并能够解决它。

import apache_beam as beam
import os
import json
import sys
import logging

""" Here the switches """
run_local = False
streaming = False

project = 'google-project-name'
bucket = 'dataflow_bucket'
tmp_location = 'gs://{}/{}/'.format(bucket, "tmp")
topic = "projects/{}/topics/dataflowtopic".format(project)

credentials_file = os.path.join(os.path.dirname(__file__), "credentials.json")
if os.path.exists(credentials_file):
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file

if not run_local:
    runner = "DataflowRunner"
else:
    runner = "DirectRunner"

argv = [
    '--region={0}'.format("europe-west6"),
    '--project={0}'.format(project),
    '--temp_location={0}'.format(tmp_location),
    '--runner={0}'.format(runner),
    '--save_main_session',
    '--max_num_workers=2'
]
if streaming:
    argv.append('--streaming')

def filter_existing(item, existing=None, id_field: str = 'id'):
    sys.stdout.write(".")
    if not item.get(id_field) in existing:
        return True
    return False


class UserProcessor(beam.DoFn):
    def process(self, user, **kwargs):

        if run_local:
            print("processing an user and getting items for it")
        else:
            logging.info("processing an user and getting items for it")
        for i in range(10):
            yield {"id": i, "item_name": "dingdong", "user": user}


class ExistingProcessor(beam.DoFn):
    def process(self, user, **kwargs):
        if run_local:
            print("creating the list to exclude items from the result")
        else:
            logging.info("creating the list to exclude items from the result")
        yield {"id": 3}
        yield {"id": 5}
        yield {"id": 9}


with beam.Pipeline(argv=argv) as p:
    if streaming:
        if run_local:
            print("Waiting for Pub/Sub Message on Topic: {}".format(topic))
        else:
            logging.info("Waiting for Pub/Sub Message on Topic: {}".format(topic))
        users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode()))
    else:
        if run_local:
            print("Loading Demo User")
        else:
            logging.info("Loading Demo User")

        example_user = {"id": "indi","name": "Indiana Jones"}
        users = p | "Loading user" >> beam.Create([example_user])

    process1 = users | "load all items for user" >> beam.ParDo(UserProcessor().with_input_types(dict))
    process2 = users | "load existing items for user" >> beam.ParDo(ExistingProcessor().with_input_types(dict)) | beam.Map(lambda x: x.get('id'))

    if run_local:
        process1 | "debug process1" >> beam.Map(print)
        process2 | "debug process2" >> beam.Map(print)

    #filtered = (process1, process2) | beam.Flatten() # this works
    filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2)) # this does NOT work when streaming, it does in batch

    if run_local:
        filtered | "debug filtered" >> beam.Map(print)
        filtered | "write down result" >> beam.io.WriteToText(os.path.join(os.path.dirname(__file__), "test_result.txt"))
    else:
        filtered | "write down result" >> beam.io.WriteToText("gs://{}/playground/output.txt".format(bucket))

    if run_local:
        print("pipeline initialized!")
    else:
        logging.info("pipeline initialized!")

运行 此脚本作为 Google 的数据流中的批处理作业,完全可以完成它需要做的事情。查看从数据流可视化的管道:

一旦设置了变量 streaming = Truerun_local = False 作业就不再工作,returns 完全没有错误。我有点迷路了。

如果我在这部分脚本中禁用 sideinput:

# this works in streaming AND batch mode
# filtered = (process1, process2) | beam.Flatten() # this works

# this does NOT work in streaming mode, but DOES works in batch mode
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2))

然后数据流上的流作业开始工作。我已经缩小了导致问题的 beam.pvalue.AsList(process2) 部分的范围。

不妨这么说,我是 Apache Beam 的新手。我可以解决的一件事是问题是侧面输入没有被窗口化(这意味着什么,只是从文档中得到它;))

你是对的,问题是侧输入没有 windowed。当你 运行 一个带有辅助输入的并行执行(映射、平面映射、过滤器、...)时,运行ner 会等到辅助输入在 运行 之前被“完全计算”主要输入。在这种情况下,它正在从没有 windowing 的 PubSub 源读取,这意味着它永远不会“完成”(即将来可能会出现更多数据)。

要完成这项工作,您需要 window 两侧,这样侧输入将变为“在 X 之前完成”,然后过滤器可以 运行 “在 X 之前完成”当 X 从 window 边界向前跳到 window 边界。

感谢@robertwb 的消息,我得以修复我的代码。我更改了 beam.io.ReadFromPubSub 的管道代码并添加了 windowing PTransform:

            users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode())) \
                | "Window into Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10))

不确定这是否是这个用例的最佳选择window,但它让我向前迈进了一大步。