从 PubsubIO 读取写入 DatastoreIO
Reading from PubsubIO writing to DatastoreIO
是否可以创建从 Pub/Sub 读取数据并写入 Datastore 的管道?在我的代码中,我将 PubsubIO 指定为输入,并应用窗口化来获取有界 PCollection,但似乎无法将 DatastoreIO.writeTo 与 options.setStreaming 一起使用,而这是必需的为了使用 PubsubIO 作为输入。有没有解决的办法?或者根本不可能从 pubsub 读取并写入数据存储?
这是我的代码:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) {
String msg = c.element();
byte[] decoded = Base64.decodeBase64(msg.getBytes());
String outmsg = new String(decoded);
c.output(outmsg);
}
}));
PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
inputEntity.apply(DatastoreIO.writeTo(datasetid));
p.run();
这是我得到的异常:
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
流式运行程序当前不支持 DatastoreIO 接收器。要从流式管道写入数据存储,您可以从 DoFn 直接调用数据存储 API。
好的,在我的头撞墙之后,我终于让它工作了。正如 danielm 所建议的,我正在从 ParDo DoFn 调用数据存储 API。一个问题是,我没有意识到有一个单独的 API 用于在 AppEngine 之外使用 Cloud Datastore。 (com.google.api.services.datastore... 对比 com.google.appengine.api.datastore...)。另一个问题是,在最新版本的 Cloud Datastore API(google-api-services-datastore-protobuf v1beta2-rev1-4.0.0,我得到一个 IllegalAccessError)中显然存在某种错误,我通过使用旧版本 (v1beta2-rev1-2.1.2)。
所以,这是我的工作代码:
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.api.services.datastore.DatastoreV1.*;
import com.google.api.services.datastore.client.Datastore;
import com.google.api.services.datastore.client.DatastoreException;
import com.google.api.services.datastore.client.DatastoreFactory;
import static com.google.api.services.datastore.client.DatastoreHelper.*;
import java.security.GeneralSecurityException;
import java.io.IOException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
//--------------------
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name"));
input.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) throws ParseException, DatastoreException {
JSONObject json = (JSONObject)new JSONParser().parse(c.element());
Datastore datastore = null;
try {
datastore = DatastoreFactory.get().create(getOptionsFromEnv()
.dataset(datasetid).build());
} catch (GeneralSecurityException exception) {
System.err.println("Security error connecting to the datastore: " + exception.getMessage());
} catch (IOException exception) {
System.err.println("I/O error connecting to the datastore: " + exception.getMessage());
}
Key.Builder keyBuilder = makeKey("my-kind");
keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace");
Entity.Builder event = Entity.newBuilder()
.setKey(keyBuilder);
event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop"))));
CommitRequest commitRequest = CommitRequest.newBuilder()
.setMode(CommitRequest.Mode.NON_TRANSACTIONAL)
.setMutation(Mutation.newBuilder().addInsertAutoId(event))
.build();
if(datastore!=null){
datastore.commit(commitRequest);
}
}
}));
p.run();
}
以及pom.xml中的依赖项:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>[1.0.0,2.0.0)</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
<version>v1beta2-rev1-2.1.2</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.17.0-rc</version>
</dependency>
<!-- Some more.. like JUnit etc.. -->
是否可以创建从 Pub/Sub 读取数据并写入 Datastore 的管道?在我的代码中,我将 PubsubIO 指定为输入,并应用窗口化来获取有界 PCollection,但似乎无法将 DatastoreIO.writeTo 与 options.setStreaming 一起使用,而这是必需的为了使用 PubsubIO 作为输入。有没有解决的办法?或者根本不可能从 pubsub 读取并写入数据存储?
这是我的代码:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) {
String msg = c.element();
byte[] decoded = Base64.decodeBase64(msg.getBytes());
String outmsg = new String(decoded);
c.output(outmsg);
}
}));
PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
inputEntity.apply(DatastoreIO.writeTo(datasetid));
p.run();
这是我得到的异常:
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
流式运行程序当前不支持 DatastoreIO 接收器。要从流式管道写入数据存储,您可以从 DoFn 直接调用数据存储 API。
好的,在我的头撞墙之后,我终于让它工作了。正如 danielm 所建议的,我正在从 ParDo DoFn 调用数据存储 API。一个问题是,我没有意识到有一个单独的 API 用于在 AppEngine 之外使用 Cloud Datastore。 (com.google.api.services.datastore... 对比 com.google.appengine.api.datastore...)。另一个问题是,在最新版本的 Cloud Datastore API(google-api-services-datastore-protobuf v1beta2-rev1-4.0.0,我得到一个 IllegalAccessError)中显然存在某种错误,我通过使用旧版本 (v1beta2-rev1-2.1.2)。
所以,这是我的工作代码:
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.api.services.datastore.DatastoreV1.*;
import com.google.api.services.datastore.client.Datastore;
import com.google.api.services.datastore.client.DatastoreException;
import com.google.api.services.datastore.client.DatastoreFactory;
import static com.google.api.services.datastore.client.DatastoreHelper.*;
import java.security.GeneralSecurityException;
import java.io.IOException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
//--------------------
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name"));
input.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) throws ParseException, DatastoreException {
JSONObject json = (JSONObject)new JSONParser().parse(c.element());
Datastore datastore = null;
try {
datastore = DatastoreFactory.get().create(getOptionsFromEnv()
.dataset(datasetid).build());
} catch (GeneralSecurityException exception) {
System.err.println("Security error connecting to the datastore: " + exception.getMessage());
} catch (IOException exception) {
System.err.println("I/O error connecting to the datastore: " + exception.getMessage());
}
Key.Builder keyBuilder = makeKey("my-kind");
keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace");
Entity.Builder event = Entity.newBuilder()
.setKey(keyBuilder);
event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop"))));
CommitRequest commitRequest = CommitRequest.newBuilder()
.setMode(CommitRequest.Mode.NON_TRANSACTIONAL)
.setMutation(Mutation.newBuilder().addInsertAutoId(event))
.build();
if(datastore!=null){
datastore.commit(commitRequest);
}
}
}));
p.run();
}
以及pom.xml中的依赖项:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>[1.0.0,2.0.0)</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore-protobuf</artifactId>
<version>v1beta2-rev1-2.1.2</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.17.0-rc</version>
</dependency>
<!-- Some more.. like JUnit etc.. -->