尝试通过 Dataflow 访问 Google Cloud Datastore 时出现 403 错误

403 error when attempting to access Google Cloud Datastore through Dataflow

我有一个 Google App Engine 应用程序,其中的数据存储在 Google 云数据存储中。我想使用 Dataflow 将部分数据放入 BigQuery,但我认为我应该先从 Datastore 获取一些信息并将其写入 Google Cloud Storage。我的代码如下所示:

public class DatastorePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(DatastorePipeline.class);

static class GetEmailFn extends DoFn<Entity, String> {

    @Override
    public void processElement(ProcessContext c) throws Exception {
        Map<String, Value> properties = DatastoreHelper.getPropertyMap(c.element());
        Value value = properties.get("email_address");
        if(value != null) {
            c.output(DatastoreHelper.getString(value));
        }
    }
}

    public static void main(String[] args) {
        Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

        Query.Builder q = Query.newBuilder();
        q.addKindBuilder().setName("User");
        Query query = q.build();

        DatastoreIO.Source source = DatastoreIO.source()
        .withDataset("my-project-id")
        .withQuery(query);

        p.apply("ReadUsersFromDatastore", Read.from(source))
        .apply(ParDo.named("GetEmailAddress").of(new GetEmailFn()))
        .apply(TextIO.Write.to("gs://dataflow-output-bucket/emails.txt"));

        p.run();
    }
}

但是,当我尝试 运行 时,我在进行数据存储区查询时收到 403 错误:

Request failed with code 403, will NOT retry: https://www.googleapis.com/datastore/v1beta2/datasets/my-project-id/runQuery

我运行使用 Google Cloud Dataflow 插件从 Eclipse 中获取此信息。 运行 没有数据存储读取的数据流作业工作正常。我做了一个

gcloud auth login

在 运行完成作业之前,如教程中所述。我做错了什么?

编辑:这是完整的堆栈跟踪。

Oct 11, 2015, 12:03:13 PM (b6119cca307b4d9a): com.google.api.services.datastore.client.DatastoreException: Unauthorized. at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at com.google.api.services.datastore.client.QuerySplitterImpl.getScatterKeys(QuerySplitterImpl.java:189) at com.google.api.services.datastore.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:75) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getSplitQueries(DatastoreIO.java:427) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.splitIntoBundles(DatastoreIO.java:306) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSplit(BasicSerializableSourceFormat.java:318) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSourceOperation(BasicSerializableSourceFormat.java:167) at com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.execute(SourceOperationExecutor.java:80) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:257) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:146) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:164) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:145) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.http.HttpResponseException: 403 Forbidden Unauthorized. at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1061) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78) ... 19 more

回答:原来问题出在我的项目基于我公司的域限制访问,这阻止了服务帐户连接。感谢 Dan 帮助我完成它!

您的数据存储区的权限似乎配置不正确。

这里有两条通用的建议:

  1. 查看 Google Cloud Dataflow Security and Permissions 文档很有用。
  2. Datastore 是在与您 运行 作业相同的项目中创建的吗?

但是,在您的情况下,您遇到了以下错误:

  1. 关联的 AppEngine 项目是否锁定到特定域的所有用户?如果是这样,则 Cloud Datastore 的当前测试版中存在一个问题,该问题会阻止 Dataflow 服务帐户(以 @cloudservices.gserviceaccount.com 结尾的电子邮件)访问数据。

    如果您使用的是 OAuth API,我们可以应用一个临时解决方法,但成本较低。解决方法将不再强制用户来自您应用的域。如果这对你来说是一个重要的要求,你可以在你的代码中执行域强制。 (普通用户 API 不受影响。)

    要请求我们应用临时解决方法,您可以通过 dataflow-feedback@google.com 向我们发送电子邮件,提及此问题并附上您的数字项目 ID。