java.lang.IllegalStateException: 无法 return 数据流中的默认编码器 2.X

java.lang.IllegalStateException: Unable to return a default Coder in dataflow 2.X

我在数据流 2.1 SDK 中有一个简单的管道。它从 pubsub 读取数据,然后对其应用 DoFn。

PCollection<MyClass> e = streamData.apply("ToE", ParDo.of(new MyDoFNClass()));

在此管道上出现以下错误:

java.lang.IllegalStateException: Unable to return a default Coder for ToEvents/ParMultiDo(MyDoFNClass).out0 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.X.X.model.MyClass.

MyDoFn class 如下:

@DefaultCoder(AvroCoder.class)

public class MyClass{

    public long id;
    public HashMap<String,HashSet<String>> a;

    @SerializedName("a")
    public Integer Id;
    @SerializedName("ae")
    public String ae;
}

找到解决方案只需要将 implements Serializable 添加到 MyClass

@DefaultCoder(AvroCoder.class)

public class MyClass implements Serializable {

public long id;
public HashMap<String,HashSet<String>> a;

@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}

以下是 Beam 编程指南中有关编码器的一些文档

Beam SDK 需要为管道中的每个 PCollection 编写一个编码器。在大多数情况下,Beam SDK 能够根据其元素类型或生成它的转换自动推断 PCollection 的编码器,但是,在某些情况下,管道作者将需要明确指定编码器,或为 PCollection 开发编码器他们的自定义类型。

每个管道对象都有一个 CoderRegistry 对象,它将语言类型映射到管道应该用于这些类型的默认编码器。您可以自己使用 CoderRegistry 查找给定类型的默认编码器,或为给定类型注册新的默认编码器。

转到下方 link 查看 beam 库使用的默认编码器 - https://beam.apache.org/documentation/programming-guide/#default-coders-and-the-coderregistry

如果您在 pCollections 中使用的对象不在默认编码器中,您可能必须为该对象提供自定义编码器。 例如如果您查看 PubsubIO.write()/PubsubIO.read() 方法的实现,它们使用自定义编码器。例如PubsubMessagePayloadOnlyCoder

假设您要将字符串转换为 Pubsub 消息。您可以将此编码器提供给您的 pcollection。

PCollection<PubsubMessage> pubsubMessagePCollection = pCollectionTuple.get(accountId);
pubsubMessagePCollection.setCoder(PubsubMessagePayloadOnlyCoder.of());