包括自定义 PTransform 会导致在 GCP 的数据流作业中找不到依赖项
including custom PTransform causes not found dependencies in the Dataflow job in GCP
我正在尝试创建一个复合 PTransform,如下所示 (Python):
class LimitVolume(beam.PTransform):
def __init__(self, daily_window, daily_limit):
super().__init__()
self.daily_window = daily_window
self.daily_limit = daily_limit
def expand(self, input_events_pcoll):
events_with_ts_pcol = (input_events_pcoll
| 'Timestamp using RECEIVED_TIMESTAMP' >> beam.Map(
lambda message: beam.window.TimestampedValue(message, message['RECEIVED_TIMESTAMP']))
)
...
return events_with_ts_pcol
然后在主要的 运行() 方法中使用它,如下所示:
def run():
...
result_pcol = input_pcol | LimitVolume(daily_window, daily_limit)
运行() 和 LimitVolume 都在同一个 main.py 脚本中,然后 submitted/deployed 作为作业进入 GCP
当我 运行 通过 DirectRunner 在本地完成这项工作时 - 一切正常;
如果我在 GCP 中使用 DataflowRunner 提交并 运行 它 - 它开始抛出如下错误:
in process NameError: name 'arrow' is not defined [while running 'Parse Json-ptransform-898945']
and in <lambda> NameError: name 'time' is not defined [while running 'define schedule-ptransform-899107']
基本上没有找到很多依赖项,这些依赖项都在 requirements.txt 文件中定义并在部署作业时通过 --requirements_file 选项指定
请参阅下面的完整错误堆栈跟踪(缩写)。
现在,妙语:
如果我将 LimitVolume PTransform 中的相同逻辑放入 运行() 方法并直接在我的管道中指定:
def run():
...
events_with_ts_pcol = (input_pcol
| 'Timestamp using RECEIVED_TIMESTAMP' >> beam.Map(
lambda message: beam.window.TimestampedValue(message, message['RECEIVED_TIMESTAMP']))
)
...
并从 main.py 文件中删除 LimitVolume Class 的定义 - 它在本地和 GCP 中都工作正常! 没有依赖关系问题。
所以,显然管道中唯一存在的自定义 PTransform 有一些非常“特别”的东西 - 有人知道那可能是什么吗?
我找不到任何关于自定义 PTransform 的信息,也找不到包装的细节,也找不到像这样的错误——这本身就令人担忧……
谢谢!!
这是错误的更大输出:
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 45, in process NameError: name 'arrow' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 997, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 351, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 45, in process NameError: name 'arrow' is not defined [while running 'Parse Json-ptransform-898945'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:771
...
line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1562, in <lambda> wrapper = lambda x: [fn(x)] File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 273, in <lambda> NameError: name 'time' is not defined [while running 'define schedule-ptransform-899107'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:771
这听起来像是 __main__
会话中 capturing/pickling 对象的问题。您可以尝试传递 save_main_session 标志。就个人而言,我更喜欢将主模块中所有必需的对象(导入、定义等)放入 run()
方法中的解决方案,以确保它们被正确捕获。
另请注意,effort to migrate to cloudpickle 可以避免这些限制。
Apache Beam 存在未知问题 [1]:
Using --save_main_session fails on Python 3 when main module has invocations of superclass method using 'super' .
这个问题已经开放了几年,但是由于 Beam 的一个依赖项 Dill 而未得到修复。 Dill 的问题可以在 Github 个问题 [2].
上找到
正如 Github issues here [3] 中的评论之一所述,解决方法是:
对于在 main 中声明的 class,例如:
class MyClass(SuperClass)
从父级调用 init 函数时 class:
super().__init__()
应使用明确的 class 名称:
SuperClass.__init__()
在您的代码中,更改应为:
class LimitVolume(beam.PTransform):
def __init__(self, daily_window, daily_limit):
beam.PTransform.__init__(self)
...
同时,错误NameError: name 'time' is not defined
也可能与Apache Beam Python 依赖项导入机制的另一个问题有关。正如@robertwb 所提到的,如果问题发生在 __main__
会话中,您可以将 --save_main_session
管道选项设置为 True
.
但是,如果错误发生在它之外,您可以通过在使用它的地方导入模块来解决这个问题。 (此处归功于 Google 数据流文档 [4])
例如,而不是:
import re
…
def myfunc():
# use re module
使用:
def myfunc():
import re
# use re module
[1] https://issues.apache.org/jira/browse/BEAM-6158
[2] https://github.com/uqfoundation/dill/issues/300
[3] https://github.com/uqfoundation/dill/issues/300#issuecomment-505011149
[4] https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors
我正在尝试创建一个复合 PTransform,如下所示 (Python):
class LimitVolume(beam.PTransform):
def __init__(self, daily_window, daily_limit):
super().__init__()
self.daily_window = daily_window
self.daily_limit = daily_limit
def expand(self, input_events_pcoll):
events_with_ts_pcol = (input_events_pcoll
| 'Timestamp using RECEIVED_TIMESTAMP' >> beam.Map(
lambda message: beam.window.TimestampedValue(message, message['RECEIVED_TIMESTAMP']))
)
...
return events_with_ts_pcol
然后在主要的 运行() 方法中使用它,如下所示:
def run():
...
result_pcol = input_pcol | LimitVolume(daily_window, daily_limit)
运行() 和 LimitVolume 都在同一个 main.py 脚本中,然后 submitted/deployed 作为作业进入 GCP
当我 运行 通过 DirectRunner 在本地完成这项工作时 - 一切正常; 如果我在 GCP 中使用 DataflowRunner 提交并 运行 它 - 它开始抛出如下错误:
in process NameError: name 'arrow' is not defined [while running 'Parse Json-ptransform-898945']
and in <lambda> NameError: name 'time' is not defined [while running 'define schedule-ptransform-899107']
基本上没有找到很多依赖项,这些依赖项都在 requirements.txt 文件中定义并在部署作业时通过 --requirements_file 选项指定
请参阅下面的完整错误堆栈跟踪(缩写)。
现在,妙语:
如果我将 LimitVolume PTransform 中的相同逻辑放入 运行() 方法并直接在我的管道中指定:
def run():
...
events_with_ts_pcol = (input_pcol
| 'Timestamp using RECEIVED_TIMESTAMP' >> beam.Map(
lambda message: beam.window.TimestampedValue(message, message['RECEIVED_TIMESTAMP']))
)
...
并从 main.py 文件中删除 LimitVolume Class 的定义 - 它在本地和 GCP 中都工作正常! 没有依赖关系问题。
所以,显然管道中唯一存在的自定义 PTransform 有一些非常“特别”的东西 - 有人知道那可能是什么吗?
我找不到任何关于自定义 PTransform 的信息,也找不到包装的细节,也找不到像这样的错误——这本身就令人担忧……
谢谢!!
这是错误的更大输出:
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 45, in process NameError: name 'arrow' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 997, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 351, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 45, in process NameError: name 'arrow' is not defined [while running 'Parse Json-ptransform-898945'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:771
...
line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1562, in <lambda> wrapper = lambda x: [fn(x)] File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 273, in <lambda> NameError: name 'time' is not defined [while running 'define schedule-ptransform-899107'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:771
这听起来像是 __main__
会话中 capturing/pickling 对象的问题。您可以尝试传递 save_main_session 标志。就个人而言,我更喜欢将主模块中所有必需的对象(导入、定义等)放入 run()
方法中的解决方案,以确保它们被正确捕获。
另请注意,effort to migrate to cloudpickle 可以避免这些限制。
Apache Beam 存在未知问题 [1]:
Using --save_main_session fails on Python 3 when main module has invocations of superclass method using 'super' .
这个问题已经开放了几年,但是由于 Beam 的一个依赖项 Dill 而未得到修复。 Dill 的问题可以在 Github 个问题 [2].
上找到正如 Github issues here [3] 中的评论之一所述,解决方法是:
对于在 main 中声明的 class,例如:
class MyClass(SuperClass)
从父级调用 init 函数时 class:
super().__init__()
应使用明确的 class 名称:
SuperClass.__init__()
在您的代码中,更改应为:
class LimitVolume(beam.PTransform):
def __init__(self, daily_window, daily_limit):
beam.PTransform.__init__(self)
...
同时,错误NameError: name 'time' is not defined
也可能与Apache Beam Python 依赖项导入机制的另一个问题有关。正如@robertwb 所提到的,如果问题发生在 __main__
会话中,您可以将 --save_main_session
管道选项设置为 True
.
但是,如果错误发生在它之外,您可以通过在使用它的地方导入模块来解决这个问题。 (此处归功于 Google 数据流文档 [4])
例如,而不是:
import re
…
def myfunc():
# use re module
使用:
def myfunc():
import re
# use re module
[1] https://issues.apache.org/jira/browse/BEAM-6158
[2] https://github.com/uqfoundation/dill/issues/300
[3] https://github.com/uqfoundation/dill/issues/300#issuecomment-505011149
[4] https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors