Google 具有通用功能的 Cloud Dataflow 自定义键

Google Cloud Dataflow Custom Keys with common functionality

我们正在使用 Dataflow Java SDK,并且我们有越来越多的自定义键 class 几乎相同。

我想让他们扩展一个公共抽象 class 但是 Dataflow SDK 似乎试图实例化抽象 class 导致 InstantiationException。

Caused by: java.lang.RuntimeException: java.lang.InstantiationException
    at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
    at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:242)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:97)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:156)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:139)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:133)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:108)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:45)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1218)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at com.telstra.cdf.rmr.model.pardos.ParDoAbstractCampaignUAKeyExtractor.processElement(ParDoAbstractCampaignUAKeyExtractor.java:5

这是我们的摘要class,

@DefaultCoder(AvroCoder.class)
public abstract class SuperClassKey  {
    public SuperClassKey(){}
    public abstract double getSomeValue();
}

这是子 class

@DefaultCoder(AvroCoder.class)
public class SubClassKey extends SuperClassKey {
    public String foo;

    public SubClassKey() {
    }

    public SubClassKey(String foo){
        this.foo = foo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SubClassKey that = (SubClassKey) o;

        if (!foo.equals(that.foo)) return false;

        return true;
    }

    @Override
    public int hashCode() {
        return foo.hashCode();
    }

    @Override
    public double getSomeValue() {
        return foo;
    }
}

我也尝试过使用接口,但没有成功。

Keys之间是否可以有一个共同的抽象class或接口?

问题可能是使用 PCollection<SuperClassKey> 而不是 PCollection<SubClassKey>。 PCollection 需要用具体的 class 键入。如果类型推断不充分,可以使用 .setCoder(AvroCoder.of(SubClassKey.class)) 显式指定编码器。

在我看来,我更改了编码器 class,示例:

之前:

AvroIO.parseGenericRecords(RecordConverter::convert)
 .withCoder(AvroCoder.of(Struct.class)).from(...)

之后:

AvroIO.parseGenericRecords(RecordConverter::convert)
 .withCoder(SerializableCoder.of(Struct.class)).from(...)