数据流管道 python 依赖安装但导入失败

dataflow pipeline python depency installs but fails to import

我有一个简单的数据流管道,运行在我的本地计算机上成功运行:

import argparse
import logging
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery


def parse_args_set_logging(argv=None):
    """
    parse command line arguments
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--verbose',
                        action='store_true',
                        help='set the logging level to debug')
    parser.add_argument('--topic',
                        default=<my topic>,
                        help='GCP pubsub topic to subscribe to')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # set logging level
    logging.basicConfig()
    if known_args.verbose:
        logging.getLogger().setLevel(logging.INFO)

    return known_args, pipeline_args


class formatForBigQueryDoFn(beam.DoFn):
    def record_handler(self, data):
        """
        Build a dictionary ensuring format matches BigQuery table schema
        """
        return {
            "uid": data['uid'],
            "interaction_type": data['interaction_type'],
            "interaction_asset_id": data['interaction_asset_id'],
            "interaction_value": data['interaction_value'],
            "timestamp": data['timestamp'],
        }

    def process(self, element):

        # extract data from the PubsubMessage python object and convert to python dict
        data = ast.literal_eval(element.data)
        logging.info("ELEMENT OBJECT: {}".format(data))

        # format the firestore timestamp for bigquery
        data['timestamp'] = data['timestamp']['_seconds']

        # construct the data for bigquery
        result = self.record_handler(data)
        return [result]


if __name__ == '__main__':
    known_args, pipeline_args = parse_args_set_logging()

    # create a pipeline object
    pipeline_options = GoogleCloudOptions(pipeline_args)
    p = beam.Pipeline(options=pipeline_options)

    # create a PCollection from the GCP pubsub topic
    inputCollection = p | beam.io.ReadFromPubSub(
        topic=known_args.topic,
        # id_label='id',  # unique identifier in each record to be processed
        with_attributes=True,  # output PubsubMessage objects
    )

    # chain together multiple transform methods, to create a new PCollection
    OutputCollection = inputCollection | beam.ParDo(formatForBigQueryDoFn())

    # write the resulting PCollection to BigQuery
    table_spec = <my table spec>
    table_schema = 'uid:STRING, interaction_type:STRING, interaction_asset_id:STRING, interaction_value:STRING, timestamp:TIMESTAMP'

    OutputCollection | beam.io.WriteToBigQuery(
        table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

    # run the pipeline
    result = p.run().wait_until_finish()

我正尝试使用 GCP 数据流 运行 这段代码。为此,我需要安装 python 依赖项 AST。我尝试创建 requirements.txt 并使用 --requirements_file 参数,但没有成功。我现在正在尝试 setup.py。关注了 the docs,我的 setup.py 看起来像这样:

import setuptools

setuptools.setup(
    name='pubsub_to_BQ',
    version='1.0',
    install_requires=[
        'AST'
    ],
    packages=setuptools.find_packages(),
)

我运行正在 GCP 上使用以下命令:

python main.py --runner DataflowRunner \
               --setup_file ./setup.py \
               --project <myproject> \
               --temp_location <my bucket> \
               --verbose \
               --streaming \
               --job_name bigqueryinteractions

但是,当管道处理数据时出现以下错误:

File "main.py", line 47, in process NameError: global name 'ast' is not defined [while running 'generatedPtransform-54']

我该如何解决这个问题?

我找到了一个解决方法,使用 json 库而不是 ast。我仍然想知道我在这里做错了什么。

AFAIK 如果您通过 shell 命令行指定 setup.py 那么您应该使用绝对路径,也可以使用 Dataflow 尝试布尔标志 save_main_session 因为没有它您的部署模板不会解析 setup.py.

中指定的依赖项

对于管道来说不是动态的参数可以在管道构造期间解决。

例如,您可以硬编码一些您需要始终传递的不变参数,因此您只需要指定每次执行都会更改的参数:

known_args, pipe_args = parser.parse_known_args()
standard_pipe_arg = ['--save_main_session', 'setup_file=./setup.py', '--streaming']
pipe_opts = PipelineOptions(pipe_args + standard_pipe_arg)