管理依赖关系——管道代码跨越多个文件
Managing Dependencies - pipeline code spans multiple files
在将“主管道代码”和“自定义转换代码”分离到多个文件后,运行 DataFlowRunner
上的流式管道遇到问题,如下所述:Multiple File Dependencies - 没有元素(发布消息)被读入管道。选项卡 - JOB LOGS、WORKER LOGS、JOB ERROR REPORTING in (new) Dataflow UI - 均未报告任何错误。职位编号:2020-04-06_15_23_52-4004061030939218807
如果有人想看...
管道最少代码(BEFORE):
pipeline.py
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
add_timestamps
是我的自定义转换
def add_timestamps(e):
payload = e.data.decode()
return {"message":payload}
当 add_timestamps
和管道代码在同一文件 pipeline.py 时,一切正常。
AFTER 我将文件重组如下:
root_dir/
pipeline.py
setup.py
my_transforms/
__init__py.py
transforms.py
其中,setup.py
import setuptools
setuptools.setup(
name='my-custom-transforms-package',
version='1.0',
install_requires=["datetime"],
packages= ['my_transforms'] #setuptools.find_packages(),
)
所有add_timestamps
转换代码移动到transforms.py(在my_transforms包目录下)
在我的 pipeline.py 中,我现在按如下方式导入和使用转换:
from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
启动管道时我设置了标志:--setup_file=./setup.py
.
然而,没有一个元素被读入管道(如您所见,数据水印仍然存在,添加的元素(近似值)未报告任何内容)
我已经在 Dataflow 中测试了多个文件依赖项选项,对我来说它工作正常。我从 Medium.
中复制了示例
您的目录结构是正确的。您是否在 transforms.py
文件中添加了任何导入?
我建议您对 setup.py
进行一些更改:
import setuptools
REQUIRED_PACKAGES = [
‘datetime’
]
PACKAGE_NAME = 'my_transforms'
PACKAGE_VERSION = '0.0.1'
setuptools.setup(
name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description='My transforms package',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages()
)
当 运行 连接您的管道时,请注意在 PipelineOptions 中设置以下字段:job_name
、project
、runner
、staging_location
、temp_location
。您必须至少指定 temp_location
或 staging_location
到 运行 您在 Google 云上的管道。如果您使用 Apache Beam SDK for Python 2.15.0 或更高版本,您还必须指定区域。请记住指定 setup.py
.
的完整路径
它看起来类似于该命令:
python3 pipeline.py \
--job_name <JOB_NAME>
--project <PROJECT_NAME> \
--runner DataflowRunner \
--region <REGION> \
--temp_location gs://<BUCKET_NAME>/temp \
--setup_file /<FULL_PATH>/setup.py
希望对您有所帮助。
我找到了根本原因...我正在设置标志 --no_use_public_ips
并且 install_requires=["datetime"]
在 setup.py
..
当然,如果没有外部 IP,工作人员将无法与 python 包管理器服务器通信以安装 datetime
。通过不设置标志 --no_use_public_ips
解决问题(稍后我将查看如何为工作人员禁用外部 IP 并且仍然能够成功 运行 的解决方案)。如果至少在 Job/Worker 日志中显示一些错误消息就好了!花了 2-3 天的时间进行故障排除:=)
在将“主管道代码”和“自定义转换代码”分离到多个文件后,运行 DataFlowRunner
上的流式管道遇到问题,如下所述:Multiple File Dependencies - 没有元素(发布消息)被读入管道。选项卡 - JOB LOGS、WORKER LOGS、JOB ERROR REPORTING in (new) Dataflow UI - 均未报告任何错误。职位编号:2020-04-06_15_23_52-4004061030939218807
如果有人想看...
管道最少代码(BEFORE): pipeline.py
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
add_timestamps
是我的自定义转换
def add_timestamps(e):
payload = e.data.decode()
return {"message":payload}
当 add_timestamps
和管道代码在同一文件 pipeline.py 时,一切正常。
AFTER 我将文件重组如下:
root_dir/
pipeline.py
setup.py
my_transforms/
__init__py.py
transforms.py
其中,setup.py
import setuptools
setuptools.setup(
name='my-custom-transforms-package',
version='1.0',
install_requires=["datetime"],
packages= ['my_transforms'] #setuptools.find_packages(),
)
所有add_timestamps
转换代码移动到transforms.py(在my_transforms包目录下)
在我的 pipeline.py 中,我现在按如下方式导入和使用转换:
from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
启动管道时我设置了标志:--setup_file=./setup.py
.
然而,没有一个元素被读入管道(如您所见,数据水印仍然存在,添加的元素(近似值)未报告任何内容)
我已经在 Dataflow 中测试了多个文件依赖项选项,对我来说它工作正常。我从 Medium.
中复制了示例您的目录结构是正确的。您是否在 transforms.py
文件中添加了任何导入?
我建议您对 setup.py
进行一些更改:
import setuptools
REQUIRED_PACKAGES = [
‘datetime’
]
PACKAGE_NAME = 'my_transforms'
PACKAGE_VERSION = '0.0.1'
setuptools.setup(
name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description='My transforms package',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages()
)
当 运行 连接您的管道时,请注意在 PipelineOptions 中设置以下字段:job_name
、project
、runner
、staging_location
、temp_location
。您必须至少指定 temp_location
或 staging_location
到 运行 您在 Google 云上的管道。如果您使用 Apache Beam SDK for Python 2.15.0 或更高版本,您还必须指定区域。请记住指定 setup.py
.
它看起来类似于该命令:
python3 pipeline.py \
--job_name <JOB_NAME>
--project <PROJECT_NAME> \
--runner DataflowRunner \
--region <REGION> \
--temp_location gs://<BUCKET_NAME>/temp \
--setup_file /<FULL_PATH>/setup.py
希望对您有所帮助。
我找到了根本原因...我正在设置标志 --no_use_public_ips
并且 install_requires=["datetime"]
在 setup.py
..
当然,如果没有外部 IP,工作人员将无法与 python 包管理器服务器通信以安装 datetime
。通过不设置标志 --no_use_public_ips
解决问题(稍后我将查看如何为工作人员禁用外部 IP 并且仍然能够成功 运行 的解决方案)。如果至少在 Job/Worker 日志中显示一些错误消息就好了!花了 2-3 天的时间进行故障排除:=)