具有数据流管道的基本 GAE 应用程序失败

Bare bones GAE app with Dataflow pipeline fails

我使用 Eclipse IDE for Java Developers 4.7.3 创建了一个带有数据流管道的基本 Google App Engine 标准应用程序。当我部署应用程序并调用 Web 服务时,我收到 java.lang.RuntimeException:暂存包时出错。我究竟做错了什么?以下是重现此错误的步骤:

  1. 使用 https://appengine.google.com 创建一个新的 Google App Engine 项目(例如 pushpin-dataflow-test)
  2. 使用https://console.developers.google.com/apis/api/dataflow.googleapis.com/overview为项目启用数据流API
  3. 使用https://console.cloud.google.com/storage/browser为项目创建存储桶(例如pushpin-dataflow-test)
  4. 运行 日食
  5. 从 Google 云平台菜单中,选择创建新项目 > Google App Engine 标准 Java 项目
  6. 在项目名称框中,键入项目名称(例如 DataflowTest)
  7. 在Java版本框中,选择Java 8、Servlet 3.1
  8. 选中创建为 Maven 项目框
  9. 在组 ID 框中,输入组 ID(例如 com.pushpin.dataflowtest)
  10. 在工件 ID 框中,输入工件 ID(例如 dataflowtest)
  11. 点击完成
  12. 更新 pom.xml 文件以匹配下面的文件
  13. 更新 HelloAppEngine.java 文件以匹配下面的文件
  14. 从 Google 云平台菜单中,选择部署到 App Engine 标准
  15. 单击项目(例如 pushpin-dataflow-test)
  16. 点击部署
  17. 调用 servlet(例如通过打开 https://pushpin-dataflow-test.appspot.com/hello
  18. 注意 500 服务器错误
  19. 查看日志
  20. 注意java.lang.RuntimeException:暂存包时出错

这是我的 pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <packaging>war</packaging>
  <version>0.1.0-SNAPSHOT</version>
  <groupId>com.pushpin.dataflowtest3</groupId>
  <artifactId>dataflowtest3</artifactId>
  <dependencies>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
    </dependency>
    <dependency>
      <groupId>javax.servlet.jsp</groupId>
      <artifactId>javax.servlet.jsp-api</artifactId>
      <version>2.3.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.appengine</groupId>
      <artifactId>appengine-api-1.0-sdk</artifactId>
      <version>1.9.60</version>
    </dependency>
    <dependency>
      <groupId>jstl</groupId>
      <artifactId>jstl</artifactId>
      <version>1.2</version>
    </dependency>
    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.4.0</version>
    </dependency>
  </dependencies>

  <build>
    <!-- for hot reload of the web application-->
    <outputDirectory>${project.build.directory}/${project.build.finalName}/WEB-INF/classes</outputDirectory>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>versions-maven-plugin</artifactId>
        <version>2.3</version>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>display-dependency-updates</goal>
              <goal>display-plugin-updates</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>com.google.cloud.tools</groupId>
        <artifactId>appengine-maven-plugin</artifactId>
        <version>1.3.2</version>
      </plugin>
    </plugins>
  </build>
</project>

这是我的 HelloAppEngine.java 文件:

package com.pushpin.dataflowtest3;

import java.io.IOException;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

@WebServlet(
    name = "HelloAppEngine",
    urlPatterns = {"/hello"}
)
public class HelloAppEngine extends HttpServlet {

  @Override
  public void doGet(HttpServletRequest request, HttpServletResponse response) 
      throws IOException {

    DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    dataflowOptions.setGcpTempLocation("gs://pushpin-dataflow-test/gcp-temp");
    dataflowOptions.setProject("pushpin-dataflow-test");
    dataflowOptions.setRunner(DataflowRunner.class);
    dataflowOptions.setTempLocation("gs://pushpin-dataflow-test/temp");
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    pipeline.run();

  }
}

这是堆栈跟踪:

java.lang.RuntimeException: Error while staging packages
    at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:396)
    at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:273)
    at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:76)
    at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:64)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:661)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.pushpin.dataflowtest3.HelloAppEngine.doGet(HelloAppEngine.java:33)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
    at com.google.apphosting.utils.servlet.ParseBlobUploadFilter.doFilter(ParseBlobUploadFilter.java:125)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.runtime.jetty9.SaveSessionFilter.doFilter(SaveSessionFilter.java:37)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.TransactionCleanupFilter.doFilter(TransactionCleanupFilter.java:48)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:297)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.eclipse.jetty.server.Server.handle(Server.java:534)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
    at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:202)
    at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
    at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:108)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:680)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:642)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:612)
    at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:806)
    at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:274)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not stage /base/java8_runtime/appengine-api.jar to gs://pushpin-dataflow-test-3a/gcp-temp/staging/appengine-api-5oFhZxRcq0C-va8sAIoBcg.jar
    at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:193)
    at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stagePackage(PackageUtil.java:174)
    at org.apache.beam.sdk.util.MoreFutures.lambda$supplyAsync[=12=](MoreFutures.java:101)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1249)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1243)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThread.run(ApiProxyImpl.java:1210)
Caused by: java.lang.IllegalStateException: Each request cannot exceed 50 active threads.
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThread.start(ApiProxyImpl.java:1197)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.initialize(AbstractGoogleAsyncWriteChannel.java:333)
    at org.apache.beam.sdk.util.GcsUtil.create(GcsUtil.java:450)
    at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.create(GcsFileSystem.java:107)
    at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.create(GcsFileSystem.java:57)
    at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:248)
    at org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackage(PackageUtil.java:246)
    at org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackageWithRetry(PackageUtil.java:206)
    at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:190)
    ... 10 more

似乎与此应用引擎限制有关: https://cloud.google.com/appengine/docs/standard/java/javadoc/com/google/appengine/api/ThreadManager#method.detail


一般来说,我不打算从 App Engine 应用程序中调用数据流...您可能想要研究的类似模式是数据流模板: https://cloud.google.com/dataflow/docs/templates/overview

(即能够 invoke/run 可重复使用的数据流作业,只需 configuration/input 微小的更改)