数据流作业中的 ModuleNotFoundError
ModuleNotFoundError in Dataflow job
我正在尝试在 Google Cloud Platform 中将 apache beam 管道作为数据流作业执行。
我的项目结构如下:
root_dir/
__init__.py
setup.py
main.py
utils/
__init__.py
log_util.py
config_util.py
这是我的 setup.py
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3",
"apache-beam[gcp]>=2.20.0",
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)
这是我的管道代码:
import math
import apache_beam as beam
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
from utils.log_util import LogUtil
from utils.config_util import ConfigUtil
class DataflowExample:
config = {}
def __init__(self):
self.config = ConfigUtil.get_config(module_config=["config"])
self.project = self.config['project']
self.region = self.config['location']
self.bucket = self.config['core_bucket']
self.batch_size = 10
def execute_pipeline(self):
try:
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")
query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
data = (
pipeline
| 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
| 'Count records' >> beam.combiners.Count.Globally()
| 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
)
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
except Exception as e:
LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")
class PrintCount(beam.DoFn):
def __init__(self):
self.logger = LogUtil()
def process(self, row_count, batch_size):
try:
current_date = datetime.today().date()
total = int(math.ceil(row_count / batch_size))
self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")
self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
except Exception as e:
self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process - {str(e)}")
if __name__ == "__main__":
df_example = DataflowExample()
df_example.execute_pipeline()
管道的功能是
- 查询 BigQuery Table。
- 计算从查询中获取的记录总数。
- 使用 utils 文件夹中的自定义日志模块进行打印。
我运行工作使用云shell使用command - python3 - main.py
尽管 Dataflow 作业启动,工作节点在几分钟后抛出错误,提示“ModuleNotFoundError:没有名为 'utils' 的模块”
“utils”文件夹可用,当使用“DirectRunner”执行时,相同的代码工作正常。
log_util
和 config_util
文件分别是用于日志记录和配置获取的自定义实用程序文件。
此外,我尝试了 运行 setup_file
选项作为 python3 - main.py --setup_file </path/of/setup.py>
这使得作业只是冻结并且即使在 15 分钟后也不会继续。
如何使用“DataflowRunner”解决 ModuleNotFoundError?
作为社区 wiki 发布。正如@GopinathS 所确认的,错误和修复如下:
工人遇到的错误是Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed
。
为了解决这个问题 "apache-beam[gcp]>=2.20.0" 已从 setup.py 的 install_requires
中移除 因为,'>=' 正在分配最新的可用版本(撰写本文时为 2.32.0),而工作人员版本仅为 2.28.0.
已更新setup.py:
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)
在管道代码中更新了 beam_options
:
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/",
"setup_file": "./setup.py"
}
还要确保一次传递所有管道选项,而不是部分传递。
如果您在命令中传递 --setup_file </path/of/setup.py>
,请确保使用代码中的 argument_parser
读取安装文件路径并将其附加到已定义的 beam_options
变量中。
为了避免解析参数并附加到 beam_options
中,我直接将其添加到 beam_options
中,作为 "setup_file": "./setup.py"
我正在尝试在 Google Cloud Platform 中将 apache beam 管道作为数据流作业执行。
我的项目结构如下:
root_dir/
__init__.py
setup.py
main.py
utils/
__init__.py
log_util.py
config_util.py
这是我的 setup.py
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3",
"apache-beam[gcp]>=2.20.0",
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)
这是我的管道代码:
import math
import apache_beam as beam
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
from utils.log_util import LogUtil
from utils.config_util import ConfigUtil
class DataflowExample:
config = {}
def __init__(self):
self.config = ConfigUtil.get_config(module_config=["config"])
self.project = self.config['project']
self.region = self.config['location']
self.bucket = self.config['core_bucket']
self.batch_size = 10
def execute_pipeline(self):
try:
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")
query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
data = (
pipeline
| 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
| 'Count records' >> beam.combiners.Count.Globally()
| 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
)
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
except Exception as e:
LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")
class PrintCount(beam.DoFn):
def __init__(self):
self.logger = LogUtil()
def process(self, row_count, batch_size):
try:
current_date = datetime.today().date()
total = int(math.ceil(row_count / batch_size))
self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")
self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
except Exception as e:
self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process - {str(e)}")
if __name__ == "__main__":
df_example = DataflowExample()
df_example.execute_pipeline()
管道的功能是
- 查询 BigQuery Table。
- 计算从查询中获取的记录总数。
- 使用 utils 文件夹中的自定义日志模块进行打印。
我运行工作使用云shell使用command - python3 - main.py
尽管 Dataflow 作业启动,工作节点在几分钟后抛出错误,提示“ModuleNotFoundError:没有名为 'utils' 的模块”
“utils”文件夹可用,当使用“DirectRunner”执行时,相同的代码工作正常。
log_util
和 config_util
文件分别是用于日志记录和配置获取的自定义实用程序文件。
此外,我尝试了 运行 setup_file
选项作为 python3 - main.py --setup_file </path/of/setup.py>
这使得作业只是冻结并且即使在 15 分钟后也不会继续。
如何使用“DataflowRunner”解决 ModuleNotFoundError?
作为社区 wiki 发布。正如@GopinathS 所确认的,错误和修复如下:
工人遇到的错误是Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed
。
为了解决这个问题 "apache-beam[gcp]>=2.20.0" 已从 setup.py 的 install_requires
中移除 因为,'>=' 正在分配最新的可用版本(撰写本文时为 2.32.0),而工作人员版本仅为 2.28.0.
已更新setup.py:
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)
在管道代码中更新了 beam_options
:
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/",
"setup_file": "./setup.py"
}
还要确保一次传递所有管道选项,而不是部分传递。
如果您在命令中传递 --setup_file </path/of/setup.py>
,请确保使用代码中的 argument_parser
读取安装文件路径并将其附加到已定义的 beam_options
变量中。
为了避免解析参数并附加到 beam_options
中,我直接将其添加到 beam_options
中,作为 "setup_file": "./setup.py"