如何在 Apache Beam Google DataFlow runner 中使用 matplotlib 模块

How to use matplotlib module in Apache Beam Google DataFlow runner

是否可以从 google 数据流(光束)中获取 matplotlib 模块?我有它在我的 requirements.txt:

matplotlib==2.0.2

但还是报错:

ImportError: No module named matplotlib

谢谢!

要准备自定义 DataFlow worker,您应该提供 setup.py 文件,其中包含安装所需包的命令。 首先,创建如下所示的 setup.py 文件(这是一个通用的 setup.py 文件)。 您应该在 REQUIRED_PACKAGES 变量中列出您的包,或者像我一样将 pip install matplotlib==2.0.2 放在 CUSTOM_COMMANDS 中。

请注意,matplotlib 需要在系统中安装一些额外的 packages/libraries,因此您也需要通过为它们指定安装命令来安装它们。此外,如果您想在 DataFlow 作业中渲染绘图,您需要 配置 matplotlib 后端 为一个能够写入文件输出(参见 How can I set the 'backend' in matplotlib in Python?)。

然后,在创建 setup.py 文件后,只需指定 Apache Beam 管道参数:

import apache_beam as beam
p = beam.Pipeline("DataFlowRunner", argv=[
'--setup_file', './setup.py',
# put other parameters here
])

通用 setup.py 文件:

import sys
import os
import logging
import subprocess
import pickle

import setuptools
import distutils

from setuptools.command.install import install as _install



class install(_install):  # pylint: disable=invalid-name
    def run(self):
        self.run_command('CustomCommands')
        _install.run(self)

CUSTOM_COMMANDS = [
    ['pip', 'install', 'matplotlib==2.0.2'],
]


class CustomCommands(setuptools.Command):
    """A setuptools Command class able to run arbitrary commands."""

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        logging.info('Running command: %s' % command_list)
        p = subprocess.Popen(
            command_list,
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        # Can use communicate(input='y\n'.encode()) if the command run requires
        # some confirmation.
        stdout_data, _ = p.communicate()
        logging.info('Command output: %s' % stdout_data)
        if p.returncode != 0:
            raise RuntimeError(
                'Command %s failed: exit code: %s' % (command_list, p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


REQUIRED_PACKAGES = [

]


setuptools.setup(
    name='name',
    version='1.0.0',
    description='DataFlow worker',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        'install': install,
        'CustomCommands': CustomCommands,
        }
    )