运行 Google 来自 Google App Engine 应用的数据流管道?
Running Google Dataflow pipeline from a Google App Engine app?
我正在使用 DataflowPipelineRunner 创建数据流作业。我尝试了以下场景。
- 不指定任何机器类型
- 配g1小机
- 与 n1-highmem-2
在上述所有场景中,输入是来自 GCS 的文件,该文件非常小(KB 大小),输出是大查询 table。
我在所有情况下都遇到内存不足错误
我编译后的代码大小是94mb。我只尝试字数统计示例,它没有读取任何输入(它在作业开始前失败)。请帮助我理解为什么会出现此错误。
注意:我正在使用appengine来启动作业。
注意:相同的代码适用于 beta 版本 0.4.150414
编辑 1
根据答案中的建议尝试了以下方法,
- 已从自动缩放切换到基本缩放。
- 使用的机型B2提供256MB内存
经过这些配置后,Java堆内存问题就解决了。但是它试图将一个 jar 上传到超过 10Mb 的暂存位置,因此它失败了。
它记录了以下异常
com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1195)
我尝试直接上传 jar 文件 - appengine-api-1.0-sdk-1.9.20.jar,它仍然尝试上传这个 jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。
我不知道它是什么罐子。想知道这个罐子是什么吗?
请帮我解决这个问题。
简短的回答是,如果您在 Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class 上使用 AppEngine,执行时间限制问题,白名单 JRE classes)。如果你真的想运行在App Engine沙箱内,那么你对Dataflow SDK的使用最符合AppEngine沙箱的限制。下面我将解释常见问题以及人们为遵守 AppEngine 沙盒限制所做的工作。
Dataflow SDK 需要一个 AppEngine 实例 class,该实例具有足够的内存来执行用户应用程序以构建管道、暂存任何资源以及将作业描述发送到 Dataflow 服务。通常我们已经看到用户需要使用内存超过 128mb 的 instance class 才能看不到 OOM 错误。
如果您的应用程序所需的资源已经暂存,通常构建管道并将其提交到数据流服务通常需要不到几秒钟的时间。将 JAR 和任何其他资源上传到 GCS 可能需要 60 秒以上的时间。这可以通过预先将 JAR 预暂存到 GCS 来手动解决(如果 Dataflow SDK 检测到它们已经存在,将跳过再次暂存它们)或使用 task queue 获得 10 分钟的限制(请注意,对于大型应用程序,10 分钟可能不足以展示您的所有资源)。
最后,在 AppEngine 沙箱环境中,您和您的所有依赖项仅限于在 JRE 中使用 whitelisted classes,否则您将得到如下异常:
java.lang.SecurityException:
java.lang.IllegalAccessException: YYY is not allowed on ZZZ
...
编辑 1
我们对 class 路径上的 jars 内容进行哈希处理,并使用修改后的文件名将它们上传到 GCS。 AppEngine 运行 是一个带有自己的 JAR 的沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar 指的是 appengine-api.jar 这是沙盒环境添加的 jar。从我们的PackageUtil#getUniqueContentName(...)可以看出,我们只是在.jar.
之前追加了-$HASH
我们正在努力解决您看到 RequestPayloadToLarge 异常的原因,目前建议您设置 filesToStage 选项和过滤器取出不需要执行数据流的罐子来解决您面临的问题。您可以看到我们如何构建文件以使用 DataflowPipelineRunner#detectClassPathResourcesToStage(...).
我对 10MB 的限制有同样的问题。我所做的是过滤掉大于该限制的 JAR 文件(而不是特定文件),然后将 DataflowPipelineOptions
中的重命名文件设置为 setFilesToStage
.
所以我只是从 Dataflow SDK 中复制了方法 detectClassPathResourcesToStage
并稍作修改:
private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB
protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
throw new IllegalArgumentException(message);
}
List<String> files = new ArrayList<>();
for (URL url : ((URLClassLoader) classLoader).getURLs()) {
try {
File file = new File(url.toURI());
if (file.length() < FILE_BYTES_THRESHOLD) {
files.add(file.getAbsolutePath());
}
} catch (IllegalArgumentException | URISyntaxException e) {
String message = String.format("Unable to convert url (%s) to file.", url);
throw new IllegalArgumentException(message, e);
}
}
return files;
}
然后当我创建 DataflowPipelineOptions
:
DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));
这是一个 Helder's 10MB-filtering 的版本,它将适应 DataflowPipelineOptions
的默认文件暂存行为,即使它在 SDK 的未来版本中发生变化。
它没有复制逻辑,而是将 DataflowPipelineOptions
的一次性副本传递给 DataflowPipelineRunner
以查看它会暂存哪些文件,然后删除任何太大的文件。
请注意,此代码假定您已经定义了一个名为 MyOptions
的自定义 PipelineOptions
class,以及一个名为 logger
的 java.util.Logger
字段。
// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;
/**
* Returns the list of .jar/etc files to stage based on the
* Options, filtering out any files that are too large for
* DataflowPipelineRunner.
*
* <p>If this accidentally filters out a necessary file, it should
* be obvious when the pipeline fails with a runtime link error.
*/
private static ImmutableList<String> getFilesToStage(MyOptions options) {
// Construct a throw-away runner with a copy of the Options to see
// which files it would have wanted to stage. This could be an
// explicitly-specified list of files from the MyOptions param, or
// the default list of files determined by DataflowPipelineRunner.
List<String> baseFiles;
{
DataflowPipelineOptions tmpOptions =
options.cloneAs(DataflowPipelineOptions.class);
// Ignore the result; we only care about how fromOptions()
// modifies its parameter.
DataflowPipelineRunner.fromOptions(tmpOptions);
baseFiles = tmpOptions.getFilesToStage();
// Some value should have been set.
Preconditions.checkNotNull(baseFiles);
}
// Filter out any files that are too large to stage.
ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
for (String file : baseFiles) {
long size = new File(file).length();
if (size < MAX_STAGED_FILE_SIZE_BYTES) {
filteredFiles.add(file);
} else {
logger.info("Not staging large file " + file + ": length " + size
+ " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
}
}
return filteredFiles.build();
}
/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
throws IOException, InterruptedException {
// DataflowPipelineRunner can't stage large files;
// remove any from the list.
DataflowPipelineOptions dpOpts =
options.as(DataflowPipelineOptions.class);
dpOpts.setFilesToStage(getFilesToStage(options));
// Run the pipeline as usual using "options".
// ...
}
我正在使用 DataflowPipelineRunner 创建数据流作业。我尝试了以下场景。
- 不指定任何机器类型
- 配g1小机
- 与 n1-highmem-2
在上述所有场景中,输入是来自 GCS 的文件,该文件非常小(KB 大小),输出是大查询 table。
我在所有情况下都遇到内存不足错误
我编译后的代码大小是94mb。我只尝试字数统计示例,它没有读取任何输入(它在作业开始前失败)。请帮助我理解为什么会出现此错误。
注意:我正在使用appengine来启动作业。
注意:相同的代码适用于 beta 版本 0.4.150414
编辑 1
根据答案中的建议尝试了以下方法,
- 已从自动缩放切换到基本缩放。
- 使用的机型B2提供256MB内存
经过这些配置后,Java堆内存问题就解决了。但是它试图将一个 jar 上传到超过 10Mb 的暂存位置,因此它失败了。
它记录了以下异常
com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory.run(ApiProxyImpl.java:1195)
我尝试直接上传 jar 文件 - appengine-api-1.0-sdk-1.9.20.jar,它仍然尝试上传这个 jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。 我不知道它是什么罐子。想知道这个罐子是什么吗?
请帮我解决这个问题。
简短的回答是,如果您在 Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class 上使用 AppEngine,执行时间限制问题,白名单 JRE classes)。如果你真的想运行在App Engine沙箱内,那么你对Dataflow SDK的使用最符合AppEngine沙箱的限制。下面我将解释常见问题以及人们为遵守 AppEngine 沙盒限制所做的工作。
Dataflow SDK 需要一个 AppEngine 实例 class,该实例具有足够的内存来执行用户应用程序以构建管道、暂存任何资源以及将作业描述发送到 Dataflow 服务。通常我们已经看到用户需要使用内存超过 128mb 的 instance class 才能看不到 OOM 错误。
如果您的应用程序所需的资源已经暂存,通常构建管道并将其提交到数据流服务通常需要不到几秒钟的时间。将 JAR 和任何其他资源上传到 GCS 可能需要 60 秒以上的时间。这可以通过预先将 JAR 预暂存到 GCS 来手动解决(如果 Dataflow SDK 检测到它们已经存在,将跳过再次暂存它们)或使用 task queue 获得 10 分钟的限制(请注意,对于大型应用程序,10 分钟可能不足以展示您的所有资源)。
最后,在 AppEngine 沙箱环境中,您和您的所有依赖项仅限于在 JRE 中使用 whitelisted classes,否则您将得到如下异常:
java.lang.SecurityException:
java.lang.IllegalAccessException: YYY is not allowed on ZZZ
...
编辑 1
我们对 class 路径上的 jars 内容进行哈希处理,并使用修改后的文件名将它们上传到 GCS。 AppEngine 运行 是一个带有自己的 JAR 的沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar 指的是 appengine-api.jar 这是沙盒环境添加的 jar。从我们的PackageUtil#getUniqueContentName(...)可以看出,我们只是在.jar.
之前追加了-$HASH我们正在努力解决您看到 RequestPayloadToLarge 异常的原因,目前建议您设置 filesToStage 选项和过滤器取出不需要执行数据流的罐子来解决您面临的问题。您可以看到我们如何构建文件以使用 DataflowPipelineRunner#detectClassPathResourcesToStage(...).
我对 10MB 的限制有同样的问题。我所做的是过滤掉大于该限制的 JAR 文件(而不是特定文件),然后将 DataflowPipelineOptions
中的重命名文件设置为 setFilesToStage
.
所以我只是从 Dataflow SDK 中复制了方法 detectClassPathResourcesToStage
并稍作修改:
private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB
protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
throw new IllegalArgumentException(message);
}
List<String> files = new ArrayList<>();
for (URL url : ((URLClassLoader) classLoader).getURLs()) {
try {
File file = new File(url.toURI());
if (file.length() < FILE_BYTES_THRESHOLD) {
files.add(file.getAbsolutePath());
}
} catch (IllegalArgumentException | URISyntaxException e) {
String message = String.format("Unable to convert url (%s) to file.", url);
throw new IllegalArgumentException(message, e);
}
}
return files;
}
然后当我创建 DataflowPipelineOptions
:
DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));
这是一个 Helder's 10MB-filtering DataflowPipelineOptions
的默认文件暂存行为,即使它在 SDK 的未来版本中发生变化。
它没有复制逻辑,而是将 DataflowPipelineOptions
的一次性副本传递给 DataflowPipelineRunner
以查看它会暂存哪些文件,然后删除任何太大的文件。
请注意,此代码假定您已经定义了一个名为 MyOptions
的自定义 PipelineOptions
class,以及一个名为 logger
的 java.util.Logger
字段。
// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;
/**
* Returns the list of .jar/etc files to stage based on the
* Options, filtering out any files that are too large for
* DataflowPipelineRunner.
*
* <p>If this accidentally filters out a necessary file, it should
* be obvious when the pipeline fails with a runtime link error.
*/
private static ImmutableList<String> getFilesToStage(MyOptions options) {
// Construct a throw-away runner with a copy of the Options to see
// which files it would have wanted to stage. This could be an
// explicitly-specified list of files from the MyOptions param, or
// the default list of files determined by DataflowPipelineRunner.
List<String> baseFiles;
{
DataflowPipelineOptions tmpOptions =
options.cloneAs(DataflowPipelineOptions.class);
// Ignore the result; we only care about how fromOptions()
// modifies its parameter.
DataflowPipelineRunner.fromOptions(tmpOptions);
baseFiles = tmpOptions.getFilesToStage();
// Some value should have been set.
Preconditions.checkNotNull(baseFiles);
}
// Filter out any files that are too large to stage.
ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
for (String file : baseFiles) {
long size = new File(file).length();
if (size < MAX_STAGED_FILE_SIZE_BYTES) {
filteredFiles.add(file);
} else {
logger.info("Not staging large file " + file + ": length " + size
+ " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
}
}
return filteredFiles.build();
}
/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
throws IOException, InterruptedException {
// DataflowPipelineRunner can't stage large files;
// remove any from the list.
DataflowPipelineOptions dpOpts =
options.as(DataflowPipelineOptions.class);
dpOpts.setFilesToStage(getFilesToStage(options));
// Run the pipeline as usual using "options".
// ...
}