Vert.x 4 事件总线使用相同的编解码器序列化多个 类

Vert.x 4 eventbus serialize multiple classes with same codec

有没有办法为多个 classes 注册编解码器?基本上,我所有的 classes 都应该使用 Jackson 对象映射器进行序列化。但似乎我必须为每个 class 创建一个自定义编解码器(尽管我可以使用泛型对其进行一些抽象)。

一个小代码示例:

编解码器:

class JacksonCodec<T>(private val mapper: ObjectMapper, private val clazz: Class<T>) : MessageCodec<T, T> {
    override fun encodeToWire(buffer: Buffer, s: T) {
        buffer.appendBytes(mapper.writeValueAsBytes(s))
    }

    override fun decodeFromWire(pos: Int, buffer: Buffer): T {
        val length = buffer.getInt(pos)
        val bytes = buffer.getBytes(pos + 4, pos + 4 + length)
        return mapper.readValue(bytes, clazz)
    }
    ...
}

为每个class注册编解码器我想序列化:

vertx.eventBus()
        .registerDefaultCodec(A::class.java, JacksonCodec(DatabindCodec.mapper(), A::class.java))
    vertx.eventBus()
vertx.eventBus()
        .registerDefaultCodec(B::class.java, JacksonCodec(DatabindCodec.mapper(), B::class.java))
    vertx.eventBus()

代码示例是 kotlin 但同样适用于 Java。

据我看代码,没有办法,因为 class 需要完全匹配:

https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/CodecManager.java#L99

这是可能的,但有一些限制和怪癖。我不建议这样做。

让我们从限制开始:

  1. 不能在集群模式下使用
  2. 每次通过事件总线发送内容时都必须声明编解码器名称。

如果您创建一个使用 Jackson 编码 类 的通用编解码器,并且每次您通过事件总线发送内容时,您都确保使用 deliveryOptions 中的 codecName 添加它,您只需注册一次即可用于您的所有 类.

完整示例:

fun main() {
  val vertx = Vertx.vertx()
  vertx.eventBus().registerCodec(GenericCodec())

  vertx.eventBus().consumer<Foo>("test-address") {
    println(it.body())
    it.reply(Bar(), genericDeliveryOptions)
  }

  vertx.eventBus().request<String>("test-address", Foo(), genericDeliveryOptions) {
    println(it.result().body())
  }

  vertx.close()
}

data class Foo(
  val foo: String = "foo",
)

data class Bar(
  val bar: String = "bar",
)

class GenericCodec : MessageCodec<Any, Any> {
  companion object {
    const val NAME = "generic"
  }

  private val mapper: ObjectMapper = ObjectMapper()

  override fun encodeToWire(buffer: Buffer, s: Any) {
    buffer.appendBytes(mapper.writeValueAsBytes(s))
  }

  override fun decodeFromWire(pos: Int, buffer: Buffer): Any {
    throw RuntimeException("should never get here, unless using clustered mode")
  }

  override fun transform(s: Any): Any {
    return s
  }

  override fun name(): String {
    return NAME
  }

  override fun systemCodecID(): Byte {
    return -1
  }
}

val genericDeliveryOptions = deliveryOptionsOf(codecName = GenericCodec.NAME)