具有领域事件的 Kafka
Kafka with Domain Events
在我的事件驱动项目中,我有 Commands
类型的消息,作为响应,我有 Events
.
这些 Commands
和 Events
消息表示域,因此它们包含域中的复杂类型。
示例:
RegisterClientCommand(Name, Email)
ClientRegisteredEvent(ClientId)
域中还有数十个这样的命令和事件对。
我在想这样的事情:
RawMessage(payloadMap, sequenceId, createdOn)
有效负载将包含消息域 class 类型名称和消息字段。
我也在阅读有关 Avro 格式的内容,但似乎需要做很多工作来定义每条消息的消息格式。
就实际通过 Kafka 代理传输的消息格式而言,最佳做法是什么?
没有单一的 "best" 方法,这完全取决于您 team/organization 的专业知识以及您项目的具体要求。
Kafka 本身对消息实际包含的内容无动于衷。大多数时候,它只是将消息值和键视为不透明的字节数组。
无论您最终将 RawMessage
定义为 Java 端的什么,都必须将其序列化为字节数组以将其生成到 Kafka 中,因为这就是 KafkaProducer
所需要的.也许它是您已经拥有的自定义字符串序列化程序,也许您可以使用 Jackson 或类似的东西将 POJO 序列化为 JSON。或者您可能只是发送一个巨大的逗号分隔字符串作为消息。完全由您决定。
重要的是,消费者在从kafka主题中拉取消息时,能够正确可靠地读取消息中各个字段的数据,不会出现任何错误、版本冲突等情况。大多数serde/schema 存在的机制,如 Avro、Protobuf 或 Thrift,试图让您更轻松地完成这项工作。特别复杂的事情,例如确保新消息与同一消息的先前版本向后兼容。
- 大多数人最终会得到以下组合:
- 用于创建字节数组以生成到 Kafka 中的 Serde 机制,一些流行的是 Avro、Protobuf、节俭.
- 原始 JSON 字符串
- 具有某种 internal/custom 格式的巨大字符串 parsed/analyzed.
- 有些公司使用集中式架构服务。这样一来,您的数据消费者不必提前知道消息包含什么模式,他们只需拉下消息,并从服务中请求相应的模式。 Confluent 有自己的自定义模式注册表解决方案,多年来一直支持 Avro,并且从几周前开始,现在正式支持 Protobuf。这 不需要 ,如果您拥有 producer/consumer 端到端,您可能会决定自己处理序列化,但很多人已经习惯了.
- 根据消息类型,有时您需要压缩,因为消息可能非常重复 and/or一些 CPU 使用和延迟的成本。这也可以在 producer/consumer 端由您自己处理,在序列化后压缩字节数组,或者您可以直接在生产者端请求消息压缩(在 Kafka 文档中查找
compression.type
).
在我的事件驱动项目中,我有 Commands
类型的消息,作为响应,我有 Events
.
这些 Commands
和 Events
消息表示域,因此它们包含域中的复杂类型。
示例:
RegisterClientCommand(Name, Email)
ClientRegisteredEvent(ClientId)
域中还有数十个这样的命令和事件对。
我在想这样的事情:
RawMessage(payloadMap, sequenceId, createdOn)
有效负载将包含消息域 class 类型名称和消息字段。
我也在阅读有关 Avro 格式的内容,但似乎需要做很多工作来定义每条消息的消息格式。
就实际通过 Kafka 代理传输的消息格式而言,最佳做法是什么?
没有单一的 "best" 方法,这完全取决于您 team/organization 的专业知识以及您项目的具体要求。
Kafka 本身对消息实际包含的内容无动于衷。大多数时候,它只是将消息值和键视为不透明的字节数组。
无论您最终将 RawMessage
定义为 Java 端的什么,都必须将其序列化为字节数组以将其生成到 Kafka 中,因为这就是 KafkaProducer
所需要的.也许它是您已经拥有的自定义字符串序列化程序,也许您可以使用 Jackson 或类似的东西将 POJO 序列化为 JSON。或者您可能只是发送一个巨大的逗号分隔字符串作为消息。完全由您决定。
重要的是,消费者在从kafka主题中拉取消息时,能够正确可靠地读取消息中各个字段的数据,不会出现任何错误、版本冲突等情况。大多数serde/schema 存在的机制,如 Avro、Protobuf 或 Thrift,试图让您更轻松地完成这项工作。特别复杂的事情,例如确保新消息与同一消息的先前版本向后兼容。
- 大多数人最终会得到以下组合:
- 用于创建字节数组以生成到 Kafka 中的 Serde 机制,一些流行的是 Avro、Protobuf、节俭.
- 原始 JSON 字符串
- 具有某种 internal/custom 格式的巨大字符串 parsed/analyzed.
- 有些公司使用集中式架构服务。这样一来,您的数据消费者不必提前知道消息包含什么模式,他们只需拉下消息,并从服务中请求相应的模式。 Confluent 有自己的自定义模式注册表解决方案,多年来一直支持 Avro,并且从几周前开始,现在正式支持 Protobuf。这 不需要 ,如果您拥有 producer/consumer 端到端,您可能会决定自己处理序列化,但很多人已经习惯了.
- 根据消息类型,有时您需要压缩,因为消息可能非常重复 and/or一些 CPU 使用和延迟的成本。这也可以在 producer/consumer 端由您自己处理,在序列化后压缩字节数组,或者您可以直接在生产者端请求消息压缩(在 Kafka 文档中查找
compression.type
).