我们如何在 Kafka pub/sub 中对消息进行版本控制?
How can we version message in Kafka pub/sub?
我在使用 Apache Kafka 时遇到一个问题,详情如下。
我构建了一个服务的 3 个实例,它们有自己的本地缓存,分别称为 S1、S2、S3。
例如,当更新请求到达 S1 时,S1 将更新自己的本地缓存,然后将作业推送到 Kafka 以更新 S2、S3 的本地缓存。
在这种情况下,我只想让 S2、S3 得到这份工作,但实际上当我将 Kafka 用于 pub/sub 所有 S1、S2、S3 都会得到这份工作。
那么我该如何处理这个问题,任何建议都会有所帮助。
我看到 3 个选项:
- 当更新请求到来时,将其放入kafka并仅通过消耗的消息更新缓存。
- 您可以将 meta-information 添加到 kafka-key 并按该关键字过滤消息。
例如:
key:instanceName=S1,
如果 currentInstanceName != instanceName do_job
注意:密钥用于分区程序 - key/round robin 的 hashCode(取决于 kafka 版本),因此行为可能会改变。
包装您的 kafka 消息并将 meta-information 添加到包装器。例如,如果您使用 json:
{
"instanceName":"S1",
"request":{...}
}
我在使用 Apache Kafka 时遇到一个问题,详情如下。
我构建了一个服务的 3 个实例,它们有自己的本地缓存,分别称为 S1、S2、S3。 例如,当更新请求到达 S1 时,S1 将更新自己的本地缓存,然后将作业推送到 Kafka 以更新 S2、S3 的本地缓存。 在这种情况下,我只想让 S2、S3 得到这份工作,但实际上当我将 Kafka 用于 pub/sub 所有 S1、S2、S3 都会得到这份工作。
那么我该如何处理这个问题,任何建议都会有所帮助。
我看到 3 个选项:
- 当更新请求到来时,将其放入kafka并仅通过消耗的消息更新缓存。
- 您可以将 meta-information 添加到 kafka-key 并按该关键字过滤消息。 例如: key:instanceName=S1, 如果 currentInstanceName != instanceName do_job
注意:密钥用于分区程序 - key/round robin 的 hashCode(取决于 kafka 版本),因此行为可能会改变。
包装您的 kafka 消息并将 meta-information 添加到包装器。例如,如果您使用 json:
{ "instanceName":"S1", "request":{...} }