Pub/Sub 模式如何在有 IntegrationEvents 的情况下解耦微服务

How Pub/Sub pattern decouples microservices while there are IntegrationEvents

我一直在寻找微服务之间的异步通信模式,它确保微服务之间解耦。然后我遇到了 Microsoft 的 eShopOnContainers 项目,它解释了如何实现 Pub/Sub 模式,内容如下:

The integration events can be defined at the application level of each microservice, so they are decoupled from other microservices, ... What is not recommended is sharing a common integration events library across multiple microservices; ... [REF]

在考虑集成事件的实现以及服务如何订阅或发布它们时,这有点令人困惑。例如,集成事件 ProductPriceChangedIntegrationEventCatalog API 中实现如下:

namespace Microsoft.eShopOnContainers.Services.Catalog.API.IntegrationEvents.Events
{
    public class ProductPriceChangedIntegrationEvent : IntegrationEvent
    {        
        public int ProductId { get; private set; }

        public decimal NewPrice { get; private set; }

        public decimal OldPrice { get; private set; }

        public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)
        {
            ProductId = productId;
            NewPrice = newPrice;
            OldPrice = oldPrice;
        }
    }
}

如果产品价格发生变化,Catalog 微服务会发布 ProductPriceChangedIntegrationEvent 事件,如下所示:

var priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id, productToUpdate.Price, oldPrice);
await _catalogIntegrationEventService.SaveEventAndCatalogContextChangesAsync(priceChangedEvent);
await _catalogIntegrationEventService.PublishThroughEventBusAsync(priceChangedEvent);

[REF]

当我检查其他微服务如何 subscribe 即使在保持 "decoupled" 时,这变得很有趣。事实证明,订阅此事件的服务实现了集成事件的精确副本并订阅了!!

例如; Basket miroservice 有一个 ProductPriceChangedIntegrationEvent 的实现,如下所示:

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.Events
{
    public class ProductPriceChangedIntegrationEvent : IntegrationEvent
    {        
        public int ProductId { get; private set; }

        public decimal NewPrice { get; private set; }

        public decimal OldPrice { get; private set; }

        public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)
        {
            ProductId = productId;
            NewPrice = newPrice;
            OldPrice = oldPrice;
        }
    }
}

[REF]

并且它订阅了 ProductPriceChangedIntegrationEvent 事件如下:

private void ConfigureEventBus(IApplicationBuilder app)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

    eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
    eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>();
}

[REF]

值得注意的是,ProductPriceChangedIntegrationEvent 指的是 Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.Events 而非 Microsoft.eShopOnContainers.Services.Catalog.API.IntegrationEvents.Events 的实现。

问题:

这是否意味着每个微服务都必须 "cloned" 实施他们想要订阅的集成事件?

Does it mean every microservice has to have a "cloned" implementation of the integration event they want to subscribe?

没错。每个微服务都必须订阅它感兴趣的事件。然后他们会使用他们想要的事件!

so, if any change happens on the publisher, all the subscriber microserverices need to update their integration event accordingly? how is that "decoupled" when they are so dependent of each others implementation? (ignoring backward-compatible changes for sake of clarity)

微服务架构提倡 "highly cohesive" 的解耦服务。意味着他们都是独立的,但同时他们必须作为一个整体工作,为此他们之间必须有合同。

解耦意味着每个服务都足够自治,可以独立增长。他们可以独立选择他们的语言、实践、数据库和规模,但如果它取决于外部集成事件,那么是的,必须有某种类型的合同,如果发布者更改合同,则必须进行沟通。

我们一般不会更改现有合同,而是发布新合同。就像当我们公开正在被外部各方使用的 API 时,我们不会更改合同,而是公开新版本,这样现有客户就不会受到影响。

希望对您有所帮助!

当我第一次开始使用微服务时,事实上,当我第一次开始研究 eShopOnContainers 示例项目时,我也有类似的担忧。虽然这是一个很好的示例,但挑战在于它本质上是一个构建在(主要)一个技术平台上的大项目,因此甚至在您开始深入研究之前,它就已经具有与之紧密耦合的味道。

想象一下,在您提到的情况下,如果 Catalog API 是用 - 比如说 - Python(或任何 non-C#)编写的,并且位于不同的存储库中。可能(或可能不)仍然存在 ProductPriceChangedIntegrationEvent class - 但微服务仍然可以轻松生成相同的集成事件并将其发布到消息总线。从任何订阅应用程序中,它都无法区分事件的生成方式或使用的技术。

That's the loose-coupling part in action. Even though the subscribing service depends on a consistent shape/payload in the integration message, it does not know or care about how that message came to be.

这种灵活性是微服务优势的重要组成部分。想要为 Catalog API 更改部分或整个技术堆栈?去吧。只要遵守现有的 合同 ,系统就永远不会知道已进行更改。

因此,任何微服务与外界通信的方式代表了它的契约。在我们的应用程序中,这不仅包含它发布的事件中包含的信息,还包含它公开的 API 端点中的信息。好消息——至少以我目前的经验来看——是因为每个微服务的领域既小又well-defined,很容易不必在启动后返回并更改合同。我们有几次,所做的更改是添加 fields/endpoints 并且 100% 向后兼容。

Does it mean every microservice has to have a "cloned" implementation of the integration event they want to subscribe?

每个服务只需要能够从约定的message/payload中成功提取它需要的特定信息即可。这可能只是整个消息负载的一小部分,也可能是全部。

事实上,这与应用程序已经使用的任何外部 API 情况相同。您指望 API 提供商遵守他们发布的合同,并在任何重大更改发生之前提前警告您。每个微服务必须以相同的方式对待。

总有一天情况并非如此,那时我们不得不违约,因此我们使用自定义 Headers 内置了 API 版本控制。 If/when 我们确实违约了,我们将能够在过渡到新合同时优雅地弃用旧合同。

知道微服务最终 可能 改变契约也是我们在使用服务时努力使用 Facade Pattern 的原因。我们还确保任何时候我们从消息或 API 响应有效负载(我们使用 JSON,但还有其他)转换为具体的 class,我们可以容忍额外的字段 - 我相信这在 JSON 风格指南中,但无论模型抽象技术如何,这都是一个很好的做法。

一开始它确实看起来很可怕和脆弱,但如果每个人都遵守关于履行合同、为最终变更做计划以及基本上符合整体应用指南的规则,它就会非常强大。我强烈建议在开始构建服务之前先使用明确的风格指南。像这样的东西:

  • API 端点命名和版本控制
  • 消息路由/结构
  • Date/Time 格式和本地化

这些都可以独立处理,但随着服务数量的增加,处理消费应用程序的人员会更快乐(并且更有效率),而不必为每项服务跳过不同的环节。