Apache Storm 中的自定义序列化

Custom serialization in Apache Storm

我尝试为 Apache Storm Spouts/Bolts 中使用的对象添加自定义序列化程序。现在我的代码看起来像这样:

conf.registerSerialization(MyService.class, MyKryoSerializer.class);

public class MyKryoSerializer extends Serializer<MyService> {

    public MyKryoSerializer() {
        System.out.println("New MyKryoSerializaer!");
    }

    @Override
    public void write(Kryo kryo, Output output, MyService service) {
        System.out.println(72);
    }

    @Override
    public MyService read(Kryo kryo, Input input, Class<MyService> aClass) {
        System.out.println(73);
        return null;
    }

    public MyService copy(Kryo kryo, MyService myService) {
        System.out.println("MyService!");
        return myService;
    }
}

public class MyRandomSpout extends BaseRichSpout {

    private MyService service;

    private SpoutOutputCollector collector;

...

因此,当我尝试在我的本地集群上启动 Storm 拓扑时,我将在 stdout 中看到 "New MyKryoSerializaer!",因此调用了构造函数,但未调用 write/read 方法。谁能告诉我,我做错了什么? Storm 是否支持用于 spots/bolts 序列化的 Skyo 序列化程序?

为了让 Storm 使用我自己的序列化逻辑,我必须将以下配置添加到我的拓扑中:

config.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, true);

其中 config 是我最终传递给提交方法的配置。

Storm 似乎不支持 Kryo 序列化 bolts 或 spouts。请参阅 this line,它明确尝试使用 Java 序列化进行序列化。