Apache Beam 侧输入无法在使用 Python SDK 的流数据流管道中工作
Apache Beam Side Inputs not working in Streaming Dataflow Pipeline with Python SDK
我正在处理一个更大的数据流管道,它在批处理模式下工作得很好,但完成的重构确实存在边输入问题。如果我将管道置于流模式并删除侧输入,则管道在 google 的数据流上完美运行。
如果剥离所有内容并构建以下简短脚本来概括问题并能够解决它。
import apache_beam as beam
import os
import json
import sys
import logging
""" Here the switches """
run_local = False
streaming = False
project = 'google-project-name'
bucket = 'dataflow_bucket'
tmp_location = 'gs://{}/{}/'.format(bucket, "tmp")
topic = "projects/{}/topics/dataflowtopic".format(project)
credentials_file = os.path.join(os.path.dirname(__file__), "credentials.json")
if os.path.exists(credentials_file):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file
if not run_local:
runner = "DataflowRunner"
else:
runner = "DirectRunner"
argv = [
'--region={0}'.format("europe-west6"),
'--project={0}'.format(project),
'--temp_location={0}'.format(tmp_location),
'--runner={0}'.format(runner),
'--save_main_session',
'--max_num_workers=2'
]
if streaming:
argv.append('--streaming')
def filter_existing(item, existing=None, id_field: str = 'id'):
sys.stdout.write(".")
if not item.get(id_field) in existing:
return True
return False
class UserProcessor(beam.DoFn):
def process(self, user, **kwargs):
if run_local:
print("processing an user and getting items for it")
else:
logging.info("processing an user and getting items for it")
for i in range(10):
yield {"id": i, "item_name": "dingdong", "user": user}
class ExistingProcessor(beam.DoFn):
def process(self, user, **kwargs):
if run_local:
print("creating the list to exclude items from the result")
else:
logging.info("creating the list to exclude items from the result")
yield {"id": 3}
yield {"id": 5}
yield {"id": 9}
with beam.Pipeline(argv=argv) as p:
if streaming:
if run_local:
print("Waiting for Pub/Sub Message on Topic: {}".format(topic))
else:
logging.info("Waiting for Pub/Sub Message on Topic: {}".format(topic))
users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode()))
else:
if run_local:
print("Loading Demo User")
else:
logging.info("Loading Demo User")
example_user = {"id": "indi","name": "Indiana Jones"}
users = p | "Loading user" >> beam.Create([example_user])
process1 = users | "load all items for user" >> beam.ParDo(UserProcessor().with_input_types(dict))
process2 = users | "load existing items for user" >> beam.ParDo(ExistingProcessor().with_input_types(dict)) | beam.Map(lambda x: x.get('id'))
if run_local:
process1 | "debug process1" >> beam.Map(print)
process2 | "debug process2" >> beam.Map(print)
#filtered = (process1, process2) | beam.Flatten() # this works
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2)) # this does NOT work when streaming, it does in batch
if run_local:
filtered | "debug filtered" >> beam.Map(print)
filtered | "write down result" >> beam.io.WriteToText(os.path.join(os.path.dirname(__file__), "test_result.txt"))
else:
filtered | "write down result" >> beam.io.WriteToText("gs://{}/playground/output.txt".format(bucket))
if run_local:
print("pipeline initialized!")
else:
logging.info("pipeline initialized!")
运行 此脚本作为 Google 的数据流中的批处理作业,完全可以完成它需要做的事情。查看从数据流可视化的管道:
一旦设置了变量 streaming = True
和 run_local = False
作业就不再工作,returns 完全没有错误。我有点迷路了。
如果我在这部分脚本中禁用 sideinput:
# this works in streaming AND batch mode
# filtered = (process1, process2) | beam.Flatten() # this works
# this does NOT work in streaming mode, but DOES works in batch mode
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2))
然后数据流上的流作业开始工作。我已经缩小了导致问题的 beam.pvalue.AsList(process2)
部分的范围。
不妨这么说,我是 Apache Beam 的新手。我可以解决的一件事是问题是侧面输入没有被窗口化(这意味着什么,只是从文档中得到它;))
你是对的,问题是侧输入没有 windowed。当你 运行 一个带有辅助输入的并行执行(映射、平面映射、过滤器、...)时,运行ner 会等到辅助输入在 运行 之前被“完全计算”主要输入。在这种情况下,它正在从没有 windowing 的 PubSub 源读取,这意味着它永远不会“完成”(即将来可能会出现更多数据)。
要完成这项工作,您需要 window 两侧,这样侧输入将变为“在 X 之前完成”,然后过滤器可以 运行 “在 X 之前完成”当 X 从 window 边界向前跳到 window 边界。
感谢@robertwb 的消息,我得以修复我的代码。我更改了 beam.io.ReadFromPubSub
的管道代码并添加了 windowing PTransform:
users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode())) \
| "Window into Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10))
不确定这是否是这个用例的最佳选择window,但它让我向前迈进了一大步。
我正在处理一个更大的数据流管道,它在批处理模式下工作得很好,但完成的重构确实存在边输入问题。如果我将管道置于流模式并删除侧输入,则管道在 google 的数据流上完美运行。
如果剥离所有内容并构建以下简短脚本来概括问题并能够解决它。
import apache_beam as beam
import os
import json
import sys
import logging
""" Here the switches """
run_local = False
streaming = False
project = 'google-project-name'
bucket = 'dataflow_bucket'
tmp_location = 'gs://{}/{}/'.format(bucket, "tmp")
topic = "projects/{}/topics/dataflowtopic".format(project)
credentials_file = os.path.join(os.path.dirname(__file__), "credentials.json")
if os.path.exists(credentials_file):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file
if not run_local:
runner = "DataflowRunner"
else:
runner = "DirectRunner"
argv = [
'--region={0}'.format("europe-west6"),
'--project={0}'.format(project),
'--temp_location={0}'.format(tmp_location),
'--runner={0}'.format(runner),
'--save_main_session',
'--max_num_workers=2'
]
if streaming:
argv.append('--streaming')
def filter_existing(item, existing=None, id_field: str = 'id'):
sys.stdout.write(".")
if not item.get(id_field) in existing:
return True
return False
class UserProcessor(beam.DoFn):
def process(self, user, **kwargs):
if run_local:
print("processing an user and getting items for it")
else:
logging.info("processing an user and getting items for it")
for i in range(10):
yield {"id": i, "item_name": "dingdong", "user": user}
class ExistingProcessor(beam.DoFn):
def process(self, user, **kwargs):
if run_local:
print("creating the list to exclude items from the result")
else:
logging.info("creating the list to exclude items from the result")
yield {"id": 3}
yield {"id": 5}
yield {"id": 9}
with beam.Pipeline(argv=argv) as p:
if streaming:
if run_local:
print("Waiting for Pub/Sub Message on Topic: {}".format(topic))
else:
logging.info("Waiting for Pub/Sub Message on Topic: {}".format(topic))
users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode()))
else:
if run_local:
print("Loading Demo User")
else:
logging.info("Loading Demo User")
example_user = {"id": "indi","name": "Indiana Jones"}
users = p | "Loading user" >> beam.Create([example_user])
process1 = users | "load all items for user" >> beam.ParDo(UserProcessor().with_input_types(dict))
process2 = users | "load existing items for user" >> beam.ParDo(ExistingProcessor().with_input_types(dict)) | beam.Map(lambda x: x.get('id'))
if run_local:
process1 | "debug process1" >> beam.Map(print)
process2 | "debug process2" >> beam.Map(print)
#filtered = (process1, process2) | beam.Flatten() # this works
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2)) # this does NOT work when streaming, it does in batch
if run_local:
filtered | "debug filtered" >> beam.Map(print)
filtered | "write down result" >> beam.io.WriteToText(os.path.join(os.path.dirname(__file__), "test_result.txt"))
else:
filtered | "write down result" >> beam.io.WriteToText("gs://{}/playground/output.txt".format(bucket))
if run_local:
print("pipeline initialized!")
else:
logging.info("pipeline initialized!")
运行 此脚本作为 Google 的数据流中的批处理作业,完全可以完成它需要做的事情。查看从数据流可视化的管道:
一旦设置了变量 streaming = True
和 run_local = False
作业就不再工作,returns 完全没有错误。我有点迷路了。
如果我在这部分脚本中禁用 sideinput:
# this works in streaming AND batch mode
# filtered = (process1, process2) | beam.Flatten() # this works
# this does NOT work in streaming mode, but DOES works in batch mode
filtered = process1 | "filter all against existing items" >> beam.Filter(filter_existing, existing=beam.pvalue.AsList(process2))
然后数据流上的流作业开始工作。我已经缩小了导致问题的 beam.pvalue.AsList(process2)
部分的范围。
不妨这么说,我是 Apache Beam 的新手。我可以解决的一件事是问题是侧面输入没有被窗口化(这意味着什么,只是从文档中得到它;))
你是对的,问题是侧输入没有 windowed。当你 运行 一个带有辅助输入的并行执行(映射、平面映射、过滤器、...)时,运行ner 会等到辅助输入在 运行 之前被“完全计算”主要输入。在这种情况下,它正在从没有 windowing 的 PubSub 源读取,这意味着它永远不会“完成”(即将来可能会出现更多数据)。
要完成这项工作,您需要 window 两侧,这样侧输入将变为“在 X 之前完成”,然后过滤器可以 运行 “在 X 之前完成”当 X 从 window 边界向前跳到 window 边界。
感谢@robertwb 的消息,我得以修复我的代码。我更改了 beam.io.ReadFromPubSub
的管道代码并添加了 windowing PTransform:
users = p | "Loading user" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(lambda x: json.loads(x.decode())) \
| "Window into Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10))
不确定这是否是这个用例的最佳选择window,但它让我向前迈进了一大步。