NameError: name 'beam' is not defined in <lambda>

NameError: name 'beam' is not defined in <lambda>

我正在 运行 在 google 数据流中设置光束管道。很长一段时间以来,它一直运行良好,没有任何问题。现在,在不更改任何代码的情况下,当我 运行 将其提交给 google 数据流时,我收到此错误:

wrapper = lambda x: [fn(x)]
File "read_from_gcs.py", line 391, in <lambda>
NameError: name 'beam' is not defined [while running 'Covert to Row-ptransform-2054']

我已经将 apachea_beam 导入为光束(因为我用它来调用所有转换) 错误指向以下代码:

schema_for_dedup = (
            distinct_without_chain | 'Filter nulls' >> beam.Filter(lambda r: r['key1'] != None and r['key2'] != None and r['key3'] != None)
            | 'Covert to Row' >> beam.Map(lambda val: beam.Row(
                             k= val['k'],
                             k2= val['k2'],
                             ))

        )

奇怪的是,当我在本地 运行 作业时,它 运行 成功地没有任何问题。但是当我使用 DataflowRunner 将它提交给数据流时,它会显示这些错误。我正在使用 apache-beam 2.32.0

最近 google 数据流的基础架构是否发生了一些变化?怎么可能几天前还可以,而且在我的本地计算机上运行良好,但现在却出现这个错误?

以前有人遇到过这个问题吗?

这是__main__模块吗?这可能是 beam 在远程计算机上 运行 时未在您的 lambda 中定义的原因。您可以尝试传递 --save_main_session 管道选项,或者将所有代码放入您在主模块中导入的另一个模块中,或者将您的导入放入一个函数中,例如

def run():
  import apache_beam as beam
  [build your pipline as normal]

至于第二个错误,europe-west4 与 europe-west3 应该无关紧要,但是 TypeError: '_UnwindowedValues' 表示您正在尝试将 [index] 与 GroupBykey 的结果一起使用通常只是一个可迭代的,而不是一个列表(因为它可能不适合内存)。而不是

def process_gbk_results(key, values):
  # do something with values[0]

input | beam.GroupByKey() | beam.MapTuple(process_gbk_results)

做类似的事情

def process_gbk_results(key, values):
  for value in values:
    # do something with value

def process_gbk_results(key, values):
  # do something with next(iter(values))

def process_gbk_results(key, values):
  values = list(values)
  # do something with values[0]