DataFlow SDK 2.x:如何使用 java 序列化从 PubSubIO 消费

DataFlow SDK 2.x: how to consume from PubSubIO using java serialization

我是 Dataflow 的新手,我打算将以下代码段从 Java SDK 1.9.0 迁移到 2.3.0:

//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
  PubsubIO.Read.named("Read from Pubsub")
  .topic(myTopic)
  .withCoder(SerializableCoder.of(MyType.class))
  .timestampLabel("myDate"));

我会将其转换为

//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));

但是从 java SDK 2.3.0 开始,PubsubIO.read() 方法是私有的。

所以我需要使用带有 MyType 序列化实例的消息,但 PubsubIO 公开的方法似乎仅适用于文本消息、avro、protobuf 等。

如何从消息包含序列化 java 对象的 PubsubIO 主题中读取信息?

更新:

我可以这样调整(还没试过)...

PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.readMessagesWithAttributes ()
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
        @Override
        public MyType apply (final PubsubMessage message) {
            final byte[] payload = message.getPayload ();
            try {
                try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
                    return (MyType) stream.readObject ();
                }
            } catch (IOException e) {
                throw new RuntimeException (e);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException (e);
            }
        }
    }))

您更新后的代码看起来应该可以工作。请注意,如果您不使用属性映射,还有 PubsubIO.readPubsubMessagesWithoutAttributes()

以前的功能已在 PR#2634 中删除,替换为最常见编码类型(proto、avro、字符串)的专用方法。

我怀疑由于依赖 Java 序列化的固有危险,未保留通过 SerializableCoder 进行的任意对象解码。见 SerializableCoder javadoc or related question Java serialization - advantages and disadvantages, use or avoid?. However, if you feel like the API is lacking, the Beam SDK is open source and the community welcomes contributions.