管理依赖关系——管道代码跨越多个文件

Managing Dependencies - pipeline code spans multiple files

在将“主管道代码”和“自定义转换代码”分离到多个文件后,运行 DataFlowRunner 上的流式管道遇到问题,如下所述:Multiple File Dependencies - 没有元素(发布消息)被读入管道。选项卡 - JOB LOGS、WORKER LOGS、JOB ERROR REPORTING in (new) Dataflow UI - 均未报告任何错误。职位编号:2020-04-06_15_23_52-4004061030939218807如果有人想看...

管道最少代码(BEFORE): pipeline.py

row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
        | "add_timestamps" >> beam.Map(add_timestamps)

add_timestamps 是我的自定义转换

def add_timestamps(e):
    payload = e.data.decode()
    return {"message":payload}

add_timestamps 和管道代码在同一文件 pipeline.py 时,一切正常。

AFTER 我将文件重组如下:

root_dir/
   pipeline.py
   setup.py
   my_transforms/
      __init__py.py
      transforms.py

其中,setup.py

import setuptools
setuptools.setup(
   name='my-custom-transforms-package',
   version='1.0',
   install_requires=["datetime"],
   packages= ['my_transforms'] #setuptools.find_packages(),
)

所有add_timestamps转换代码移动到transforms.py(在my_transforms包目录下)

在我的 pipeline.py 中,我现在按如下方式导入和使用转换:

from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
        | "add_timestamps" >> beam.Map(add_timestamps)

启动管道时我设置了标志:--setup_file=./setup.py.

然而,没有一个元素被读入管道(如您所见,数据水印仍然存在,添加的元素(近似值)未报告任何内容)

我已经在 Dataflow 中测试了多个文件依赖项选项,对我来说它工作正常。我从 Medium.

中复制了示例

您的目录结构是正确的。您是否在 transforms.py 文件中添加了任何导入?

我建议您对 setup.py 进行一些更改:

import setuptools

REQUIRED_PACKAGES = [
    ‘datetime’
]

PACKAGE_NAME = 'my_transforms'
PACKAGE_VERSION = '0.0.1'

setuptools.setup(
   name=PACKAGE_NAME,
   version=PACKAGE_VERSION,
   description='My transforms package',
   install_requires=REQUIRED_PACKAGES,
   packages=setuptools.find_packages()
)

当 运行 连接您的管道时,请注意在 PipelineOptions 中设置以下字段:job_nameprojectrunnerstaging_locationtemp_location。您必须至少指定 temp_locationstaging_location 到 运行 您在 Google 云上的管道。如果您使用 Apache Beam SDK for Python 2.15.0 或更高版本,您还必须指定区域。请记住指定 setup.py.

的完整路径

它看起来类似于该命令:

python3 pipeline.py \
--job_name <JOB_NAME>
--project <PROJECT_NAME> \
--runner DataflowRunner \
--region <REGION> \
--temp_location gs://<BUCKET_NAME>/temp \
--setup_file /<FULL_PATH>/setup.py

希望对您有所帮助。

我找到了根本原因...我正在设置标志 --no_use_public_ips 并且 install_requires=["datetime"]setup.py..

当然,如果没有外部 IP,工作人员将无法与 python 包管理器服务器通信以安装 datetime。通过不设置标志 --no_use_public_ips 解决问题(稍后我将查看如何为工作人员禁用外部 IP 并且仍然能够成功 运行 的解决方案)。如果至少在 Job/Worker 日志中显示一些错误消息就好了!花了 2-3 天的时间进行故障排除:=)