Apache Spark GCS 连接器的速率限制

Rate limit with Apache Spark GCS connector

我在 Google Compute Engine 集群上使用 Spark 与 Google 云存储连接器(而不是 HDFS,如 recommended),并获得了很多 "rate limit"错误,如下:

java.io.IOException: Error inserting: bucket: *****, object: *****
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.wrapException(GoogleCloudStorageImpl.java:1600)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.run(GoogleCloudStorageImpl.java:475)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests
{
  "code" : 429,
  "errors" : [ {
    "domain" : "usageLimits",
    "message" : "The total number of changes to the object ***** exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
    "reason" : "rateLimitExceeded"
  } ],
  "message" : "The total number of changes to the object ***** exceeds the rate limit. Please reduce the rate of create, update, and delete requests."
}
  at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
  at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
  at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.run(GoogleCloudStorageImpl.java:472)
  ... 3 more

谢谢!

您是否尝试设置 spark.local.dir 配置参数并将该 tmp space 的磁盘(最好是 SSD)附加到您的 Google Compute Engine 实例?

https://spark.apache.org/docs/1.2.0/configuration.html

您无法更改项目的速率限制,一旦达到限制,您必须使用退避算法。由于您提到的大多数 reads/writes 用于 tmp 文件,请尝试将 Spark 配置为使用本地磁盘。

不幸的是,当设置为 DEFAULT_FS 时,GCS 的使用可能会以很高的目录对象创建率弹出,无论是将其用于中间目录还是最终 input/output 目录。特别是对于使用 GCS 作为最终输出目录,很难应用任何 Spark 端解决方法来减少冗余目录创建请求的速率。

好消息是,这些目录请求中的大多数确实是多余的,只是因为系统习惯于能够本质上 "mkdir -p",并且如果目录已经存在,则便宜地 return 为真。在我们的例子中,可以在 GCS 连接器端修复它,方法是捕获这些错误,然后检查该目录是否确实是由其他工作人员在竞争条件下创建的。

现在应该用 https://github.com/GoogleCloudPlatform/bigdata-interop/commit/141b1efab9ef23b6b5f5910d8206fcbc228d2ed7

解决这个问题

要测试,只需运行:

git clone https://github.com/GoogleCloudPlatform/bigdata-interop.git
cd bigdata-interop
mvn -P hadoop1 package
# Or or Hadoop 2
mvn -P hadoop2 package

您应该会找到可供使用的文件 "gcs/target/gcs-connector-*-shaded.jar"。要将其插入 bdutil,只需 gsutil cp gcs/target/gcs-connector-*shaded.jar gs://<your-bucket>/some-path/ 然后编辑 bdutil/bdutil_env.sh for Hadoop 1 或 bdutil/hadoop2_env.sh 以更改:

GCS_CONNECTOR_JAR='https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-1.4.1-hadoop2.jar'

改为指向您的 gs://<your-bucket>/some-path/ 路径; bdutil 会自动检测到您正在使用 gs:// 前缀 URI,并将在部署期间做正确的事情。

如果它解决了您的问题,请告诉我们!