Rebus 4.0:如何使用 ISagaStorage 和 ISubscriptionStorage 的自定义实现

Rebus 4.0: How to use customized implementation of ISagaStorage and ISubscriptionStorage

我想在将 saga 和订阅数据存储到数据库之前对其进行操作,因此我实现了它们的两个接口,如下所示,

ISagaStorage

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using log4net;
using Newtonsoft.Json;
using Rebus.Sagas;

namespace Rebus.SagaExtension
{
    public class NDbSagaPersister : ISagaStorage
    {
        static ILog _log;
        static NDbSagaPersister()
        {
            _log = LogManager.GetLogger(typeof(NDbSagaPersister));
        }

        public NDbSagaPersister()
        {
            _log.Info("establishing connection");
        }

        /// <summary>
        /// Inserts the given saga data, once in a while also ensuring that synchronous indexes with unique
        /// constraints are created for the given saga data property paths.
        /// </summary>
        public Task Insert(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
        {
            return Task.Run(() =>
            {
                //Some implementation
            });
        }

        /// <summary>
        /// Updates the given saga data with an optimistic lock, once in a while also ensuring that synchronous
        /// indexes with unique constraints are created for the given saga data property paths.
        /// </summary>
        public Task Update(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
        {
            return Task.Run(() =>
            {
                //Some implementation
            });
        }

        /// <summary>
        /// Deletes the given saga data from the underlying Couch collection. Throws and <see cref="OptimisticLockingException"/>
        /// if not exactly one saga data document was deleted.
        /// </summary>
        public Task Delete(ISagaData sagaData)
        {
            return Task.Run(() =>
            {
                //Some implementation
            });
        }

        /// <summary>
        /// Queries the underlying Couch collection for a saga data of the given type with the
        /// given value at the specified property path. Returns null if none could be found.
        /// </summary>
        public Task<ISagaData> Find(Type sagaDataType, string propertyName, object propertyValue)
        {
            ISagaData sagaData = null;
            try
            {
                //Some implementation
            }
            catch (Exception ex)
            {
                _log.Error("Saga Data Exception " + ex.Message);
            }
            return Task.FromResult<ISagaData>(sagaData);
        }
    }
}

和如下所示的 ISubscriptionStorage,

using System;
using System.Linq;
using Rebus.Subscriptions;
using Microsoft.Win32;
using System.Threading.Tasks;
using log4net;

namespace Rebus.SagaExtension
{
    public class NDbSubscriptionStorage : ISubscriptionStorage
    {
        static ILog _log;
        public bool IsCentralized => true;

        static NDbSubscriptionStorage()
        {
            _log = LogManager.GetLogger(typeof(NDbSubscriptionStorage));
        }

        public Task<string[]> GetSubscriberAddresses(string topic)
        {
            string[] subs = null;
            //Some implementation
            return Task.FromResult<string[]>(subs);
        }

        public Task UnregisterSubscriber(string topic, string subscriberAddress)
        {
            return Task.Run(() =>
            {
                //Some implementation
            });
        }

        public Task RegisterSubscriber(string topic, string subscriberAddress)
        {
            return Task.Run(() =>
            {
                //Some implementation
            });
        }
    }
}

现在在配置rebus的时候应该怎么使用它,

public class RebusInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        Configure.With(new CastleWindsorContainerAdapter(container))
          .Options(o =>
          {
              o.SpecifyOrderOfHandlers()
                  .First<AuthenticationHandler>();
          })
          .Logging(l => l.Log4Net())
          .Transport(t => t.UseMsmq(ConfigurationManager.AppSettings["InputQueue"]))
          .Routing(r => r.TypeBasedRoutingFromAppConfig())
          .Sagas(m => m.StoreInMySagaDb()) //How?
          .Subscriptions(m => m.StoreInMySubscriberDb()) //How?
          .Timeouts(m => m.StoreInMemory())
          .Events(e =>
          {
              e.BeforeMessageSent += (bus, headers, message, context) =>
              {
                  var timeToBeReceived = ConfigurationManager.AppSettings["TimeToBeReceived"];
                  if (string.IsNullOrEmpty(timeToBeReceived))
                      timeToBeReceived = "00:00:30";
                  if (timeToBeReceived != "-1")
                  {
                      headers.Add(Headers.TimeToBeReceived, timeToBeReceived);
                      headers.Add(Headers.Express, "rebus-express");
                  }
              };
          })
          .Start();
    }
}

请建议在这种情况下配置 saga 和订阅者所需的步骤。

您可以这样实施 StoreInMySagaDb

public static class MyRebusConfigurationExtensions
{
    public static void StoreInMySagaDb(this StandardConfigurer<ISagaStorage> configurer)
    {
        configurer.Register(c => new MySagaStorage());
    }
}

可能注入,例如Rebus 当前配置的记录器工厂是这样的:

public static class MyRebusConfigurationExtensions
{
    public static void StoreInMySagaDb(this StandardConfigurer<ISagaStorage> configurer)
    {
        configurer.Register(c => new MySagaStorage(c.Get<IRebusLoggerFactory>()));
    }
}

您可以对订阅存储使用相同的方法,只需为 StandardConfigurer<ISubscriptionStorage> 编写一个扩展方法即可。