DataFlow SDK 2.x:如何使用 java 序列化从 PubSubIO 消费
DataFlow SDK 2.x: how to consume from PubSubIO using java serialization
我是 Dataflow 的新手,我打算将以下代码段从 Java SDK 1.9.0 迁移到 2.3.0:
//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
PubsubIO.Read.named("Read from Pubsub")
.topic(myTopic)
.withCoder(SerializableCoder.of(MyType.class))
.timestampLabel("myDate"));
我会将其转换为
//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));
但是从 java SDK 2.3.0 开始,PubsubIO.read()
方法是私有的。
所以我需要使用带有 MyType
序列化实例的消息,但 PubsubIO
公开的方法似乎仅适用于文本消息、avro、protobuf 等。
如何从消息包含序列化 java 对象的 PubsubIO 主题中读取信息?
更新:
我可以这样调整(还没试过)...
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.readMessagesWithAttributes ()
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
@Override
public MyType apply (final PubsubMessage message) {
final byte[] payload = message.getPayload ();
try {
try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
return (MyType) stream.readObject ();
}
} catch (IOException e) {
throw new RuntimeException (e);
} catch (ClassNotFoundException e) {
throw new RuntimeException (e);
}
}
}))
您更新后的代码看起来应该可以工作。请注意,如果您不使用属性映射,还有 PubsubIO.readPubsubMessagesWithoutAttributes()
。
以前的功能已在 PR#2634 中删除,替换为最常见编码类型(proto、avro、字符串)的专用方法。
我怀疑由于依赖 Java 序列化的固有危险,未保留通过 SerializableCoder
进行的任意对象解码。见 SerializableCoder
javadoc or related question Java serialization - advantages and disadvantages, use or avoid?. However, if you feel like the API is lacking, the Beam SDK is open source and the community welcomes contributions.
我是 Dataflow 的新手,我打算将以下代码段从 Java SDK 1.9.0 迁移到 2.3.0:
//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
PubsubIO.Read.named("Read from Pubsub")
.topic(myTopic)
.withCoder(SerializableCoder.of(MyType.class))
.timestampLabel("myDate"));
我会将其转换为
//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));
但是从 java SDK 2.3.0 开始,PubsubIO.read()
方法是私有的。
所以我需要使用带有 MyType
序列化实例的消息,但 PubsubIO
公开的方法似乎仅适用于文本消息、avro、protobuf 等。
如何从消息包含序列化 java 对象的 PubsubIO 主题中读取信息?
更新:
我可以这样调整(还没试过)...
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.readMessagesWithAttributes ()
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
@Override
public MyType apply (final PubsubMessage message) {
final byte[] payload = message.getPayload ();
try {
try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
return (MyType) stream.readObject ();
}
} catch (IOException e) {
throw new RuntimeException (e);
} catch (ClassNotFoundException e) {
throw new RuntimeException (e);
}
}
}))
您更新后的代码看起来应该可以工作。请注意,如果您不使用属性映射,还有 PubsubIO.readPubsubMessagesWithoutAttributes()
。
以前的功能已在 PR#2634 中删除,替换为最常见编码类型(proto、avro、字符串)的专用方法。
我怀疑由于依赖 Java 序列化的固有危险,未保留通过 SerializableCoder
进行的任意对象解码。见 SerializableCoder
javadoc or related question Java serialization - advantages and disadvantages, use or avoid?. However, if you feel like the API is lacking, the Beam SDK is open source and the community welcomes contributions.