从 PubsubIO 读取写入 DatastoreIO

Reading from PubsubIO writing to DatastoreIO

是否可以创建从 Pub/Sub 读取数据并写入 Datastore 的管道?在我的代码中,我将 PubsubIO 指定为输入,并应用窗口化来获取有界 PCollection,但似乎无法将 DatastoreIO.writeTo 与 options.setStreaming 一起使用,而这是必需的为了使用 PubsubIO 作为输入。有没有解决的办法?或者根本不可能从 pubsub 读取并写入数据存储?

这是我的代码:

DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);

    options.setRunner(DataflowPipelineRunner.class);
    options.setProject(projectName);
    options.setStagingLocation("gs://my-staging-bucket/staging");
    options.setStreaming(true);

    Pipeline p = Pipeline.create(options);

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 1L;
        public void processElement(ProcessContext c) {
            String msg = c.element();
            byte[] decoded = Base64.decodeBase64(msg.getBytes());
            String outmsg = new String(decoded);
            c.output(outmsg);
        }
    }));
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));

    inputEntity.apply(DatastoreIO.writeTo(datasetid));


    p.run();

这是我得到的异常:

Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)

流式运行程序当前不支持 DatastoreIO 接收器。要从流式管道写入数据存储,您可以从 DoFn 直接调用数据存储 API。

好的,在我的头撞墙之后,我终于让它工作了。正如 danielm 所建议的,我正在从 ParDo DoFn 调用数据存储 API。一个问题是,我没有意识到有一个单独的 API 用于在 AppEngine 之外使用 Cloud Datastore。 (com.google.api.services.datastore... 对比 com.google.appengine.api.datastore...)。另一个问题是,在最新版本的 Cloud Datastore API(google-api-services-datastore-protobuf v1beta2-rev1-4.0.0,我得到一个 IllegalAccessError)中显然存在某种错误,我通过使用旧版本 (v1beta2-rev1-2.1.2)。

所以,这是我的工作代码:

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.api.services.datastore.DatastoreV1.*;
import com.google.api.services.datastore.client.Datastore;
import com.google.api.services.datastore.client.DatastoreException;
import com.google.api.services.datastore.client.DatastoreFactory;
import static com.google.api.services.datastore.client.DatastoreHelper.*;
import java.security.GeneralSecurityException;
import java.io.IOException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

//--------------------

public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);

    options.setRunner(DataflowPipelineRunner.class);
    options.setProject(projectName);
    options.setStagingLocation("gs://my-staging-bucket/staging");
    options.setStreaming(true);

    Pipeline p = Pipeline.create(options);
    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name"));

    input.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 1L;
        public void processElement(ProcessContext c) throws ParseException, DatastoreException {

            JSONObject json = (JSONObject)new JSONParser().parse(c.element());

            Datastore datastore = null;
            try {
                datastore = DatastoreFactory.get().create(getOptionsFromEnv()
                        .dataset(datasetid).build());
            } catch (GeneralSecurityException exception) {
                System.err.println("Security error connecting to the datastore: " + exception.getMessage());
            } catch (IOException exception) {
                System.err.println("I/O error connecting to the datastore: " + exception.getMessage());
            }

            Key.Builder keyBuilder = makeKey("my-kind");
            keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace");
            Entity.Builder event = Entity.newBuilder()
                    .setKey(keyBuilder);

            event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop"))));

            CommitRequest commitRequest = CommitRequest.newBuilder()
                    .setMode(CommitRequest.Mode.NON_TRANSACTIONAL)
                    .setMutation(Mutation.newBuilder().addInsertAutoId(event))
                    .build();
            if(datastore!=null){
                datastore.commit(commitRequest);
            }

        }
    }));


    p.run();
}

以及pom.xml中的依赖项:

<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>[1.0.0,2.0.0)</version>
</dependency>
<dependency>
  <groupId>com.google.apis</groupId>
  <artifactId>google-api-services-datastore-protobuf</artifactId>
  <version>v1beta2-rev1-2.1.2</version>
</dependency>
<dependency>
  <groupId>com.google.http-client</groupId>
  <artifactId>google-http-client</artifactId>
  <version>1.17.0-rc</version>
</dependency>
<!-- Some more.. like JUnit etc..  -->