从 Cloud Functions 启动 Cloud Dataflow

Launching Cloud Dataflow from Cloud Functions

如何从 Google Cloud Function 启动 Cloud Dataflow 作业?我想使用 Google Cloud Functions 作为启用跨服务组合的机制。

我在下面包含了 WordCount 示例的一个非常基本的示例。请注意,您需要在 Cloud Function 部署中包含 java 二进制文件的副本,因为它不在默认环境中。同样,您还需要将部署 jar 与 Cloud Function 打包在一起。

module.exports = {
  wordcount: function (context, data) {
    const spawn = require('child_process').spawn;
    const child = spawn(
            'jre1.8.0_73/bin/java',
            ['-cp',
             'MY_JAR.jar',
             'com.google.cloud.dataflow.examples.WordCount',
             '--jobName=fromACloudFunction',
             '--project=MY_PROJECT',
             '--runner=BlockingDataflowPipelineRunner',
             '--stagingLocation=gs://STAGING_LOCATION',
             '--inputFile=gs://dataflow-samples/shakespeare/*',
             '--output=gs://OUTPUT_LOCATION'
            ],
            { cwd: __dirname });

    child.stdout.on('data', function(data) {
      console.log('stdout: ' + data);
    });
    child.stderr.on('data', function(data) {
      console.log('error: ' + data);
    });
    child.on('close', function(code) {
      console.log('closing code: ' + code);
    });
    context.success();
  }
}

您可以使用 non-blocking runner 并使用函数 return 作业 ID 来进一步增强此示例,以便您可以单独轮询作业完成情况。这种模式应该对其他SDK也有效,只要它们的依赖项可以打包到Cloud Function中。

最好的方法是通过云功能启动,但要小心,如果您使用云功能进行 google 云存储,那么对于每个上传的文件都会启动一个数据流作业。

const { google } = require('googleapis');

const templatePath = "gs://template_dir/df_template;
const project = "<project_id>";
const tempLoc = "gs://tempLocation/";

exports.PMKafka = (data, context, callback) => {
    const file = data;

    console.log(`Event ${context.eventId}`);
    console.log(`Event Type: ${context.eventType}`);
    console.log(`Bucket Name: ${file.bucket}`);
    console.log(`File Name: ${file.name}`);
    console.log(`Metageneration: ${file.metageneration}`);
    console.log(`Created: ${file.timeCreated}`);
    console.log(`Updated: ${file.updated}`);
    console.log(`Uploaded File Name - gs://${file.bucket}/${file.name}`);

    google.auth.getApplicationDefault(function (err, authClient, projectId) {
        if (err) {
            throw err;
        }

        if (authClient.createScopedRequired && authClient.createScopedRequired()) {
            authClient = authClient.createScoped(authScope);
        }

        const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

        var inputDict= {
            inputFile: `gs://${file.bucket}/${file.name}`,
            ...
            ...
            <other_runtime_parameters>
        };

        var env = {
            tempLocation: tempLoc
        };

        var resource_opts = {
            parameters: inputDict,
            environment: env,
            jobName: config.jobNamePrefix + "-" + new Date().toISOString().toLowerCase().replace(":","-").replace(".","-")
        };

        var opts = {
            gcsPath: templatePath,
            projectId: project,
            resource: resource_opts
        }

        console.log(`Dataflow Run Time Options - ${JSON.stringify(opts)}`)

        dataflow.projects.templates.launch(opts, function (err, response) {
            if (err) {
                console.error("problem running dataflow template, error was: ", err);
                slack.publishMessage(null, null, false, err);
                return;
            }
            console.log("Dataflow template response: ", response);
            var jobid = response["data"]["job"]["id"];
            console.log("Dataflow Job ID: ", jobid);
        });
        callback();
    });
};