NameError: name 'beam' is not defined in <lambda>
NameError: name 'beam' is not defined in <lambda>
我正在 运行 在 google 数据流中设置光束管道。很长一段时间以来,它一直运行良好,没有任何问题。现在,在不更改任何代码的情况下,当我 运行 将其提交给 google 数据流时,我收到此错误:
wrapper = lambda x: [fn(x)]
File "read_from_gcs.py", line 391, in <lambda>
NameError: name 'beam' is not defined [while running 'Covert to Row-ptransform-2054']
我已经将 apachea_beam 导入为光束(因为我用它来调用所有转换)
错误指向以下代码:
schema_for_dedup = (
distinct_without_chain | 'Filter nulls' >> beam.Filter(lambda r: r['key1'] != None and r['key2'] != None and r['key3'] != None)
| 'Covert to Row' >> beam.Map(lambda val: beam.Row(
k= val['k'],
k2= val['k2'],
))
)
奇怪的是,当我在本地 运行 作业时,它 运行 成功地没有任何问题。但是当我使用 DataflowRunner 将它提交给数据流时,它会显示这些错误。我正在使用 apache-beam 2.32.0
最近 google 数据流的基础架构是否发生了一些变化?怎么可能几天前还可以,而且在我的本地计算机上运行良好,但现在却出现这个错误?
以前有人遇到过这个问题吗?
这是__main__
模块吗?这可能是 beam
在远程计算机上 运行 时未在您的 lambda 中定义的原因。您可以尝试传递 --save_main_session
管道选项,或者将所有代码放入您在主模块中导入的另一个模块中,或者将您的导入放入一个函数中,例如
def run():
import apache_beam as beam
[build your pipline as normal]
至于第二个错误,europe-west4 与 europe-west3 应该无关紧要,但是 TypeError: '_UnwindowedValues'
表示您正在尝试将 [index]
与 GroupBykey 的结果一起使用通常只是一个可迭代的,而不是一个列表(因为它可能不适合内存)。而不是
def process_gbk_results(key, values):
# do something with values[0]
input | beam.GroupByKey() | beam.MapTuple(process_gbk_results)
做类似的事情
def process_gbk_results(key, values):
for value in values:
# do something with value
或
def process_gbk_results(key, values):
# do something with next(iter(values))
或
def process_gbk_results(key, values):
values = list(values)
# do something with values[0]
我正在 运行 在 google 数据流中设置光束管道。很长一段时间以来,它一直运行良好,没有任何问题。现在,在不更改任何代码的情况下,当我 运行 将其提交给 google 数据流时,我收到此错误:
wrapper = lambda x: [fn(x)]
File "read_from_gcs.py", line 391, in <lambda>
NameError: name 'beam' is not defined [while running 'Covert to Row-ptransform-2054']
我已经将 apachea_beam 导入为光束(因为我用它来调用所有转换) 错误指向以下代码:
schema_for_dedup = (
distinct_without_chain | 'Filter nulls' >> beam.Filter(lambda r: r['key1'] != None and r['key2'] != None and r['key3'] != None)
| 'Covert to Row' >> beam.Map(lambda val: beam.Row(
k= val['k'],
k2= val['k2'],
))
)
奇怪的是,当我在本地 运行 作业时,它 运行 成功地没有任何问题。但是当我使用 DataflowRunner 将它提交给数据流时,它会显示这些错误。我正在使用 apache-beam 2.32.0
最近 google 数据流的基础架构是否发生了一些变化?怎么可能几天前还可以,而且在我的本地计算机上运行良好,但现在却出现这个错误?
以前有人遇到过这个问题吗?
这是__main__
模块吗?这可能是 beam
在远程计算机上 运行 时未在您的 lambda 中定义的原因。您可以尝试传递 --save_main_session
管道选项,或者将所有代码放入您在主模块中导入的另一个模块中,或者将您的导入放入一个函数中,例如
def run():
import apache_beam as beam
[build your pipline as normal]
至于第二个错误,europe-west4 与 europe-west3 应该无关紧要,但是 TypeError: '_UnwindowedValues'
表示您正在尝试将 [index]
与 GroupBykey 的结果一起使用通常只是一个可迭代的,而不是一个列表(因为它可能不适合内存)。而不是
def process_gbk_results(key, values):
# do something with values[0]
input | beam.GroupByKey() | beam.MapTuple(process_gbk_results)
做类似的事情
def process_gbk_results(key, values):
for value in values:
# do something with value
或
def process_gbk_results(key, values):
# do something with next(iter(values))
或
def process_gbk_results(key, values):
values = list(values)
# do something with values[0]