Java 如何从 Cloud Function 触发 Cloud Dataflow 管道作业?

How to trigger Cloud Dataflow pipeline job from Cloud Function in Java?

我需要从 Cloud Functions 触发 Cloud Dataflow 管道。但是Cloud函数必须写成Java。因此,Cloud Function 的触发器是 Google Cloud Storage 的 Finalise/Create 事件,即当文件上传到 GCS 存储桶时,Cloud Function 必须触发 Cloud 数据流。

当我创建一个数据流管道(批处理)并执行该管道时,它会创建一个数据流管道模板并创建一个数据流作业。

但是当我在 Java 中创建云函数并上传文件时,状态只是显示“ok”,但它不会触发数据流管道。

云函数

package com.example;

import com.example.Example.GCSEvent;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.logging.Logger;

public class Example implements BackgroundFunction<GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException {
        logger.info("Event: " + context.eventId());
        logger.info("Event Type: " + context.eventType());


        HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials);


        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer)
                .setApplicationName("Google Dataflow function Demo")
                .build();

        String projectId = "my-project-id";


        RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
        runtimeEnvironment.setBypassTempDirValidation(false);
        runtimeEnvironment.setTempLocation("gs://my-dataflow-job-bucket/tmp");
        CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest();
        createJobFromTemplateRequest.setEnvironment(runtimeEnvironment);
        createJobFromTemplateRequest.setLocation("us-central1");
        createJobFromTemplateRequest.setGcsPath("gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template");
        createJobFromTemplateRequest.setJobName("Dataflow-Cloud-Job");
        createJobFromTemplateRequest.setParameters(new HashMap<String,String>());
        createJobFromTemplateRequest.getParameters().put("inputFile","gs://cloud-dataflow-bucket-input/*.txt");
        dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest);

        throw new UnsupportedOperationException("Not supported yet.");
    }

    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }


}

pom.xml(云函数)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cloudfunctions</groupId>
  <artifactId>http-function</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.target>11</maven.compiler.target>
    <maven.compiler.source>11</maven.compiler.source>
  </properties>

  <dependencies>
  <!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-credentials -->
<dependency>
    <groupId>com.google.auth</groupId>
    <artifactId>google-auth-library-credentials</artifactId>
    <version>0.21.1</version>
</dependency>

  <dependency>
    <groupId>com.google.apis</groupId>
    <artifactId>google-api-services-dataflow</artifactId>
    <version>v1b3-rev207-1.20.0</version>
</dependency>
    <dependency>
      <groupId>com.google.cloud.functions</groupId>
      <artifactId>functions-framework-api</artifactId>
      <version>1.0.1</version>
    </dependency>
         <dependency>
    <groupId>com.google.auth</groupId>
    <artifactId>google-auth-library-oauth2-http</artifactId>
    <version>0.21.1</version>
</dependency>
  </dependencies>

  <!-- Required for Java 11 functions in the inline editor -->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <excludes>
            <exclude>.google/</exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

云函数日志

我浏览了以下博客(添加以供参考),其中它们通过云功能触发了来自云存储的数据流。但是代码是用 Node.js 或 python 编写的。 但是我的云函数必须写成java.

通过 Node.js

中的云函数触发数据流管道

https://dzone.com/articles/triggering-dataflow-pipelines-with-cloud-functions

使用python

通过云函数触发数据流管道

https://medium.com/google-cloud/how-to-kick-off-a-dataflow-pipeline-via-cloud-functions-696927975d4e

非常感谢对此的任何帮助。

RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation("gs://karthiksfirstbucket/temp1");

LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters();
launchTemplateParameters.setEnvironment(runtimeEnvironment);
launchTemplateParameters.setJobName("newJob" + (new Date()).getTime());

Map<String, String> params = new HashMap<String, String>();
params.put("inputFile", "gs://karthiksfirstbucket/sample.txt");
params.put("output", "gs://karthiksfirstbucket/count1");
launchTemplateParameters.setParameters(params);
writer.write("4");
       
Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters);            
launch.setGcsPath("gs://dataflow-templates-us-central1/latest/Word_Count");
launch.execute();

以上代码启动模板并执行数据流管道

  1. 使用应用程序默认凭据(可以更改为用户凭据或服务凭据)
  2. region 为默认区域(可以更改)。
  3. 为每个 HTTP 触发器创建一个作业(触发器可以更改)。

完整代码如下:

https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java

这是我使用新数据流依赖项的解决方案

public class Example implements BackgroundFunction<Example.GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws Exception {
        String filename = event.name;

        logger.info("Processing file: " + filename);
        logger.info("Bucket name" + event.bucket);

        String projectId = "cedar-router-268801";
        String region = "us-central1";
        String tempLocation = "gs://cedar-router-beam-poc/temp";
        String templateLocation = "gs://cedar-router-beam-poc/template/poc-template.json";

        logger.info("path" + String.format("gs://%s/%s", event.bucket, filename));
        String scenario = filename.substring(0, 3); //it comes TWO OR ONE

        logger.info("scneario " + scenario);

        Map<String, String> params = Map.of("sourceFile", String.format("%s/%s", event.bucket, filename),
                "scenario", scenario,
                "years", "2013,2014",
                "targetFile", "gs://cedar-router-beam-poc-kms/result/testfile");
        
        extractedJob(projectId, region, tempLocation, templateLocation, params);
    }

    private static void extractedJob(String projectId,
                                     String region,
                                     String tempLocation,
                                     String templateLocation,
                                     Map<String, String> params) throws Exception {

        HttpTransport httpTransport = GoogleApacheHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = GsonFactory.getDefaultInstance();
        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer httpRequestInitializer = new RetryHttpRequestInitializer(ImmutableList.of(404));
        ChainingHttpRequestInitializer chainingHttpRequestInitializer =
                new ChainingHttpRequestInitializer(new HttpCredentialsAdapter(credentials), httpRequestInitializer);

        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, chainingHttpRequestInitializer)
                .setApplicationName("Dataflow from Cloud function")
                .build();

        FlexTemplateRuntimeEnvironment runtimeEnvironment = new FlexTemplateRuntimeEnvironment();
        runtimeEnvironment.setTempLocation(tempLocation);

        LaunchFlexTemplateParameter launchFlexTemplateParameter = new LaunchFlexTemplateParameter();
        launchFlexTemplateParameter.setEnvironment(runtimeEnvironment);
        String jobName = params.get("sourceFile").substring(34, 49).replace("_","");
        logger.info("job name" + jobName);
        launchFlexTemplateParameter.setJobName("job" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));

        launchFlexTemplateParameter.setContainerSpecGcsPath(templateLocation);
        launchFlexTemplateParameter.setParameters(params);

        LaunchFlexTemplateRequest launchFlexTemplateRequest = new LaunchFlexTemplateRequest();
        launchFlexTemplateRequest.setLaunchParameter(launchFlexTemplateParameter);


        Launch launch = dataflowService.projects()
                .locations()
                .flexTemplates()
                .launch(projectId, region, launchFlexTemplateRequest);

        launch.execute();
        logger.info("running job");
    }


    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }
Just adapt it to your case