数据流作业中的 ModuleNotFoundError

ModuleNotFoundError in Dataflow job

我正在尝试在 Google Cloud Platform 中将 apache beam 管道作为数据流作业执行。

我的项目结构如下:

root_dir/
  __init__.py
  ​setup.py
  ​main.py
  ​utils/
    __init__.py
    log_util.py
    config_util.py

这是我的 setup.py

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3",
        "apache-beam[gcp]>=2.20.0",
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

这是我的管道代码:

import math
import apache_beam as beam

from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions

from utils.log_util import LogUtil
from utils.config_util import ConfigUtil


class DataflowExample:
    config = {}

    def __init__(self):
        self.config = ConfigUtil.get_config(module_config=["config"])
        self.project = self.config['project']
        self.region = self.config['location']
        self.bucket = self.config['core_bucket']
        self.batch_size = 10

    def execute_pipeline(self):
        try:
            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")

            query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"

            beam_options = {
                "project": self.project,
                "region": self.region,
                "job_name": "dataflow_example",
                "runner": "DataflowRunner",
                "temp_location": f"gs://{self.bucket}/temp_location/"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline:
                data = (
                        pipeline
                        | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
                        | 'Count records' >> beam.combiners.Count.Globally()
                        | 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
                )

            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
        except Exception as e:
            LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")


class PrintCount(beam.DoFn):

    def __init__(self):
        self.logger = LogUtil()

    def process(self, row_count, batch_size):
        try:
            current_date = datetime.today().date()
            total = int(math.ceil(row_count / batch_size))

            self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")

            self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
        except Exception as e:
            self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process  - {str(e)}")


if __name__ == "__main__":
    df_example = DataflowExample()
    df_example.execute_pipeline()

管道的功能是

  1. 查询 BigQuery Table。
  2. 计算从查询中获取的记录总数。
  3. 使用 utils 文件夹中的自定义日志模块进行打印。

我运行工作使用云shell使用command - python3 - main.py

尽管 Dataflow 作业启动,工作节点在几分钟后抛出错误,提示“ModuleNotFoundError:没有名为 'utils' 的模块”

“utils”文件夹可用,当使用“DirectRunner”执行时,相同的代码工作正常。

log_utilconfig_util 文件分别是用于日志记录和配置获取的自定义实用程序文件。

此外,我尝试了 运行 setup_file 选项作为 python3 - main.py --setup_file </path/of/setup.py> 这使得作业只是冻结并且即使在 15 分钟后也不会继续。

如何使用“DataflowRunner”解决 ModuleNotFoundError?

作为社区 wiki 发布。正如@GopinathS 所确认的,错误和修复如下:

工人遇到的错误是Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed

为了解决这个问题 "apache-beam[gcp]>=2.20.0" 已从 setup.py 的 install_requires 中移除 因为,'>=' 正在分配最新的可用版本(撰写本文时为 2.32.0),而工作人员版本仅为 2.28.0.

已更新setup.py:

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

在管道代码中更新了 beam_options

    beam_options = {
        "project": self.project,
        "region": self.region,
        "job_name": "dataflow_example",
        "runner": "DataflowRunner",
        "temp_location": f"gs://{self.bucket}/temp_location/",
        "setup_file": "./setup.py"
    }

还要确保一次传递所有管道选项,而不是部分传递。

如果您在命令中传递 --setup_file </path/of/setup.py>,请确保使用代码中的 argument_parser 读取安装文件路径并将其附加到已定义的 beam_options 变量中。

为了避免解析参数并附加到 beam_options 中,我直接将其添加到 beam_options 中,作为 "setup_file": "./setup.py"