如何从 AppEngine PHP 应用程序将数据从 Cloud Storage 加载到 Cloud Datastore?

How do I load data from Cloud Storage to Cloud Datastore from an AppEngine PHP application?

我一直在寻找各种来源,但这个新手并不清楚。如何从 AppEngine PHP 应用程序将数据(CSV 文件)从 Cloud Storage 加载到 Cloud Datastore?我确实有一个现有的方法可以下载文件,然后将每一行作为事务加载。几百万行需要几个小时,所以这似乎不是最好的方法,并且一直在寻找更有效的方法。我感谢任何指导。

编辑这个,因为我已经切换到尝试使用远程 URL,从中将 JSON 数据从 GAE 加载到数据存储中。代码不工作,虽然我不知道为什么(还):

<?php

require 'vendor/autoload.php';
use Google\Auth\ApplicationDefaultCredentials;
use Google\Cloud\Datastore\DatastoreClient;

/**
 * Create a new product with a given SKU.
 *
 * @param DatastoreClient $datastore
 * @param $sku
 * @param $product
 * @return Google\Cloud\Datastore\Entity
 */
function add_product(DatastoreClient $datastore, $sku, $product)
{
    $productKey = $datastore->key('SKU', $sku);
    $product = $datastore->entity(
        $productKey,
        [
            'created' => new DateTime(),
            'name' => strtolower($product)
        ]);
    $datastore->upsert($product);
    return $product;
}

/*
  Load Cloud DataStore Kind from remote URL

  @param $projectId
  @param $url
*/
function load_datastore($projectId, $url) {
  // Create Datastore client
  $datastore = new DatastoreClient(['projectId' => $projectId]);

  // Enable `allow_url_fopen` to allow reading file from URL
  ini_set("allow_url_fopen", 1);

  // Read the products listing and load to Cloud Datastore.
  // Use batches of 20 for a transaction
  $json = json_decode(file_get_contents($url), true);
  $count = 1;
  foreach($json as $sku_key => $product_val) {
    if ($count == 1) {
          $transaction = $datastore->transaction();
    }
    add_product($datastore, $sku_key, $product_val);
        if ($count == 20) {
          $transaction->commit();
          $count = 0;
        } catch (Exception $err) {
        echo 'Caught exception: ',  $err->getMessage(), "\n";
      $transaction->rollback();
    }
    $count++;
  }
}

try
{
    $projectId = 'development';
    $url = 'https://raw.githubusercontent.com/BestBuyAPIs/open-data-set/master/products.json';
    load_datastore($projectId, $url);
} catch (Exception $err) {
    echo 'Caught exception: ',  $err->getMessage(), "\n";
  $transaction->rollback();
}
?>

抱歉没有说得更具体,但我是 python 标准环境 GAE 用户,对 PHP 环境相当不熟悉。

一般来说,您当前的方法是序列化和同步的 - 您一次处理一个行(或者,如果事务中的所有 upsert 调用实际转到,则最多以 20 个为一批)数据存储在一个批次中),阻塞每个数据存储交互并仅在该交互完成后才前进到下一行。

我不确定 PHP 环境是否支持异步数据存储操作 and/or 真正的批处理操作(python ndb 库最多可以将 500 次写入批处理成一个数据存储调用)- 这些可以帮助加快速度。

另一件需要考虑的事情如果您的行是完全独立的 - 您真的需要事务来写入它们吗?如果 PHP 支持纯书写,您可以这样做(交易需要更长的时间才能完成)。

即使没有上述支持,您仍然可以通过将行读取与等待数据存储操作完成分离来显着加快速度:

  • 在当前的请求处理程序中,您只保留行读取和创建 20 行的批次以某种方式传递给其他线程(任务队列,pub/sub,单独的线程 - 任何你可以进入 PHP)

  • 在单独的请求处理程序(或任务队列或 pub/sub 处理程序,具体取决于您选择如何传递批处理数据)上,您接收这些批处理并进行实际的数据存储调用。通过这种方式,您可以并行处理多个批次,从整体处理时间的角度来看,它们被阻塞等待数据存储回复的时间变得无关紧要。

使用这种方法,您的性能将仅受读取行和使这些批次排队的速度的限制。如果你想更快 - 你也可以将单个 CSV 文件拆分成多个较小的文件,因此也有多个可以并行工作的行阅读器,为那些批处理工作人员提供服务。

旁注:也许您想重试 failed/rolled-back 事务或保存这些实体以供稍后重试,目前看来您正在丢失它们。

这个问题类似于 and

快速回答是您可以使用 Apache Beam 或 Cloud Dataflow 将 CSV 数据导入 Cloud Datastore。

Google 提供预先编写的数据流模板。您可以使用 GCS 到数据存储数据流模板读取 CSV,将 CSV 转换为数据存储实体 JSON,然后将结果写入数据存储。

假设您有以下 CSV:

username, first, last, age, location.zip, location.city, location.state
samsmith, Sam, Smith, 33, 94040, Mountain View, California
johndoe, John, Doe, 50, 30075, Roswell, Georgia
dannyboy, Danny, Mac, 94040, Mountain View, California

您可以使用以下 UDF 将此 CSV 转换为 Kind People 的数据存储实体。此 UDF 假定以下架构:

  • 用户名 = 键和字符串 属性
  • 第一个=字符串属性
  • 最后一个 = 字符串 属性
  • 年龄 = 整数 属性
  • 位置=记录
  • Location.Zip = 整数 属性
  • Location.City = 字符串 属性
  • Location.State = 字符串 属性

这个 UDF 输出一个 JSON 编码的实体。这与 Cloud Datastore REST API. Values can be of the following types.

使用的 JSON 有效负载相同
function myTransform(csvString) {
 var row = csvString.split(",");
 if (row.length != 4) { return; }

 return JSON.stringify({
   "key": {
     "partition_id": {
       // default namespace is an empty string
       "namespace_id": ""
     },
     "path": {
       "kind": "People",
       "name": row[0]
     }
   },
   "properties": {
     "username": { "stringValue": row[0] },
     "first": { "stringValue": row[1] },
     "last": { "stringValue": row[2] },
     "age": { "integerValue": row[3] },
     "location": { 
       "entityValue": {
         "properties": {
           "zip": { "integerValue": row[4] },
           "city": { "stringValue": row[5] },
           "state": { "stringValue": row[6] }
         }
       }
     } 
   }
 });
}

到运行 数据流模板。首先使用 gsutil 将该 UDF 保存到 GCS 存储桶中。

gsutil cp my_csv_udf.js gs://mybucket/my_csv_udf.js

现在进入 Google 云平台控制台。前往数据流页面。单击从模板创建作业和 select "GCS Text to Datastore"。也可以参考这个doc.

您的作业参数如下所示:

  • textReadPattern = gs://path/to/data/*.csv
  • javascriptTextTransformGcsPath = gs://mybucket/my_csv_udf.js
  • javascriptTextTransformFunctionName = myTransform
  • datastoreWriteProjectId = 我的项目 ID
  • errorWritePath = gs://path/to/data/errors

注意:UDF 转换仅支持 JavaScript ECMAScript 5.1。所以只有基本的javascript,没有花哨的箭头函数/承诺...等等