使用超类型创建 MapFunction 时编译失败

Compiling fails when creating MapFunction with super type

我正在使用 Flink 1.12.0,我有以下简单的测试用例:

我定义了两个模型class(AbstractDataModel是超类型,而ConcreteModel是子类型):

public interface AbstractDataModel {
    public String getValue();
}

public class ConcreteModel implements AbstractDataModel {
    private String key;
    private String value;

    public ConcreteModel() {
    }

    public ConcreteModel(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

然后,我定义一个简单的应用如下,就是将ConcreteModel映射成字符串,

MapFunction 正在使用超类型 AbstractDataModel,但出现编译错误抱怨:

Required type:
MapFunction<com.ConcreteModel,java.lang.String>
Provided:
MyMapFunction

如果我仍然想在 MapFunction 中使用 AbstractDataModel 作为通用类型,我会问如何解决这个问题

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

class MyMapFunction implements MapFunction<AbstractDataModel, String> {

    public String map(AbstractDataModel model) throws Exception {
        return model.getValue();
    }
}

public class ConcreteModelTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        env.registerType(ConcreteModel.class);
        //        env.registerType(AbstractDataModel.class);
        //
        DataStream<String> ds = env.fromElements(new ConcreteModel("a", "1"), new ConcreteModel("b", "2")).map(new MyMapFunction());
        ds.print();
        env.execute();
    }
}

这主要是因为 Flink 由于其分布式环境而无法处理 POJO 对象。这是 docs:

的内容

15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on “Data Types & Serialization” for details of the effect on performance.

您可以使用 ResultTypeQueryable 并在方法 public TypeInformation getProducedType().

上使用 TypeInformation.of(AbstractDataModel.class) 定义 return 类型

This interface can be implemented by functions and input formats to tell the framework about their produced data type. This method acts as an alternative to the reflection analysis that is otherwise performed and is useful in situations where the produced data type may vary depending on parametrization.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

public class MyMapFunction implements MapFunction<AbstractDataModel, String>, 
    ResultTypeQueryable {

    @Override
    public String map(AbstractDataModel value) throws Exception {
        return value.getValue();
    }

    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(AbstractDataModel.class);
    }
}
public class ConcreteModelTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        AbstractDataModel concreteModel01 = new ConcreteModel("a", "1");
        AbstractDataModel concreteModel02 = new ConcreteModel("a", "2");
        DataStream<String> ds = env
                .fromElements(concreteModel01, concreteModel02)
                .map(new MyMapFunction());
        ds.print();
        env.execute();
    }
}

或者一个简单的方法是用 TypeInformation.of(String.class) 调用 map。那么你不需要在 MyMapFunction.

实现 ResultTypeQueryable
DataStream<String> ds = env
    .fromElements(concreteModel01, concreteModel02)
    .map(new MyMapFunction(), TypeInformation.of(String.class));

然后只需将您的界面与它的 class 实现一起使用。

public interface AbstractDataModel {
    public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
    private String key;
    private String value;
    public ConcreteModel() {
    }
    public ConcreteModel(String key, String value) {
        this.key = key;
        this.value = value;
    }
    public String getKey() {
        return key;
    }
    public void setKey(String key) {
        this.key = key;
    }
    public String getValue() {
        return value;
    }
    public void setValue(String value) {
        this.value = value;
    }
}