具有领域事件的 Kafka

Kafka with Domain Events

在我的事件驱动项目中,我有 Commands 类型的消息,作为响应,我有 Events.

这些 CommandsEvents 消息表示域,因此它们包含域中的复杂类型。

示例:

RegisterClientCommand(Name, Email)

ClientRegisteredEvent(ClientId)

域中还有数十个这样的命令和事件对。

我在想这样的事情:

RawMessage(payloadMap, sequenceId, createdOn)

有效负载将包含消息域 class 类型名称和消息字段。

我也在阅读有关 Avro 格式的内容,但似乎需要做很多工作来定义每条消息的消息格式。

就实际通过 Kafka 代理传输的消息格式而言,最佳做法是什么?

没有单一的 "best" 方法,这完全取决于您 team/organization 的专业知识以及您项目的具体要求。

Kafka 本身对消息实际包含的内容无动于衷。大多数时候,它只是将消息值和键视为不透明的字节数组。

无论您最终将 RawMessage 定义为 Java 端的什么,都必须将其序列化为字节数组以将其生成到 Kafka 中,因为这就是 KafkaProducer 所需要的.也许它是您已经拥有的自定义字符串序列化程序,也许您可​​以使用 Jackson 或类似的东西将 POJO 序列化为 JSON。或者您可能只是发送一个巨大的逗号分隔字符串作为消息。完全由您决定。

重要的是,消费者在从kafka主题中拉取消息时,能够正确可靠地读取消息中各个字段的数据,不会出现任何错误、版本冲突等情况。大多数serde/schema 存在的机制,如 Avro、Protobuf 或 Thrift,试图让您更轻松地完成这项工作。特别复杂的事情,例如确保新消息与同一消息的先前版本向后兼容。

  • 大多数人最终会得到以下组合:
    • 用于创建字节数组以生成到 Kafka 中的 Serde 机制,一些流行的是 AvroProtobuf节俭.
    • 原始 JSON 字符串
    • 具有某种 internal/custom 格式的巨大字符串 parsed/analyzed.
  • 有些公司使用集中式架构服务。这样一来,您的数据消费者不必提前知道消息包含什么模式,他们只需拉下消息,并从服务中请求相应的模式。 Confluent 有自己的自定义模式注册表解决方案,多年来一直支持 Avro,并且从几周前开始,现在正式支持 Protobuf。这 不需要 ,如果您拥有 producer/consumer 端到端,您可能会决定自己处理序列化,但很多人已经习惯了.
  • 根据消息类型,有时您需要压缩,因为消息可能非常重复 and/or一些 CPU 使用和延迟的成本。这也可以在 producer/consumer 端由您自己处理,在序列化后压缩字节数组,或者您可以直接在生产者端请求消息压缩(在 Kafka 文档中查找 compression.type ).