Rebus 4.0:如何使用 ISagaStorage 和 ISubscriptionStorage 的自定义实现
Rebus 4.0: How to use customized implementation of ISagaStorage and ISubscriptionStorage
我想在将 saga 和订阅数据存储到数据库之前对其进行操作,因此我实现了它们的两个接口,如下所示,
ISagaStorage
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using log4net;
using Newtonsoft.Json;
using Rebus.Sagas;
namespace Rebus.SagaExtension
{
public class NDbSagaPersister : ISagaStorage
{
static ILog _log;
static NDbSagaPersister()
{
_log = LogManager.GetLogger(typeof(NDbSagaPersister));
}
public NDbSagaPersister()
{
_log.Info("establishing connection");
}
/// <summary>
/// Inserts the given saga data, once in a while also ensuring that synchronous indexes with unique
/// constraints are created for the given saga data property paths.
/// </summary>
public Task Insert(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
return Task.Run(() =>
{
//Some implementation
});
}
/// <summary>
/// Updates the given saga data with an optimistic lock, once in a while also ensuring that synchronous
/// indexes with unique constraints are created for the given saga data property paths.
/// </summary>
public Task Update(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
return Task.Run(() =>
{
//Some implementation
});
}
/// <summary>
/// Deletes the given saga data from the underlying Couch collection. Throws and <see cref="OptimisticLockingException"/>
/// if not exactly one saga data document was deleted.
/// </summary>
public Task Delete(ISagaData sagaData)
{
return Task.Run(() =>
{
//Some implementation
});
}
/// <summary>
/// Queries the underlying Couch collection for a saga data of the given type with the
/// given value at the specified property path. Returns null if none could be found.
/// </summary>
public Task<ISagaData> Find(Type sagaDataType, string propertyName, object propertyValue)
{
ISagaData sagaData = null;
try
{
//Some implementation
}
catch (Exception ex)
{
_log.Error("Saga Data Exception " + ex.Message);
}
return Task.FromResult<ISagaData>(sagaData);
}
}
}
和如下所示的 ISubscriptionStorage,
using System;
using System.Linq;
using Rebus.Subscriptions;
using Microsoft.Win32;
using System.Threading.Tasks;
using log4net;
namespace Rebus.SagaExtension
{
public class NDbSubscriptionStorage : ISubscriptionStorage
{
static ILog _log;
public bool IsCentralized => true;
static NDbSubscriptionStorage()
{
_log = LogManager.GetLogger(typeof(NDbSubscriptionStorage));
}
public Task<string[]> GetSubscriberAddresses(string topic)
{
string[] subs = null;
//Some implementation
return Task.FromResult<string[]>(subs);
}
public Task UnregisterSubscriber(string topic, string subscriberAddress)
{
return Task.Run(() =>
{
//Some implementation
});
}
public Task RegisterSubscriber(string topic, string subscriberAddress)
{
return Task.Run(() =>
{
//Some implementation
});
}
}
}
现在在配置rebus的时候应该怎么使用它,
public class RebusInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
Configure.With(new CastleWindsorContainerAdapter(container))
.Options(o =>
{
o.SpecifyOrderOfHandlers()
.First<AuthenticationHandler>();
})
.Logging(l => l.Log4Net())
.Transport(t => t.UseMsmq(ConfigurationManager.AppSettings["InputQueue"]))
.Routing(r => r.TypeBasedRoutingFromAppConfig())
.Sagas(m => m.StoreInMySagaDb()) //How?
.Subscriptions(m => m.StoreInMySubscriberDb()) //How?
.Timeouts(m => m.StoreInMemory())
.Events(e =>
{
e.BeforeMessageSent += (bus, headers, message, context) =>
{
var timeToBeReceived = ConfigurationManager.AppSettings["TimeToBeReceived"];
if (string.IsNullOrEmpty(timeToBeReceived))
timeToBeReceived = "00:00:30";
if (timeToBeReceived != "-1")
{
headers.Add(Headers.TimeToBeReceived, timeToBeReceived);
headers.Add(Headers.Express, "rebus-express");
}
};
})
.Start();
}
}
请建议在这种情况下配置 saga 和订阅者所需的步骤。
您可以这样实施 StoreInMySagaDb
:
public static class MyRebusConfigurationExtensions
{
public static void StoreInMySagaDb(this StandardConfigurer<ISagaStorage> configurer)
{
configurer.Register(c => new MySagaStorage());
}
}
可能注入,例如Rebus 当前配置的记录器工厂是这样的:
public static class MyRebusConfigurationExtensions
{
public static void StoreInMySagaDb(this StandardConfigurer<ISagaStorage> configurer)
{
configurer.Register(c => new MySagaStorage(c.Get<IRebusLoggerFactory>()));
}
}
您可以对订阅存储使用相同的方法,只需为 StandardConfigurer<ISubscriptionStorage>
编写一个扩展方法即可。
我想在将 saga 和订阅数据存储到数据库之前对其进行操作,因此我实现了它们的两个接口,如下所示,
ISagaStorage
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using log4net;
using Newtonsoft.Json;
using Rebus.Sagas;
namespace Rebus.SagaExtension
{
public class NDbSagaPersister : ISagaStorage
{
static ILog _log;
static NDbSagaPersister()
{
_log = LogManager.GetLogger(typeof(NDbSagaPersister));
}
public NDbSagaPersister()
{
_log.Info("establishing connection");
}
/// <summary>
/// Inserts the given saga data, once in a while also ensuring that synchronous indexes with unique
/// constraints are created for the given saga data property paths.
/// </summary>
public Task Insert(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
return Task.Run(() =>
{
//Some implementation
});
}
/// <summary>
/// Updates the given saga data with an optimistic lock, once in a while also ensuring that synchronous
/// indexes with unique constraints are created for the given saga data property paths.
/// </summary>
public Task Update(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
return Task.Run(() =>
{
//Some implementation
});
}
/// <summary>
/// Deletes the given saga data from the underlying Couch collection. Throws and <see cref="OptimisticLockingException"/>
/// if not exactly one saga data document was deleted.
/// </summary>
public Task Delete(ISagaData sagaData)
{
return Task.Run(() =>
{
//Some implementation
});
}
/// <summary>
/// Queries the underlying Couch collection for a saga data of the given type with the
/// given value at the specified property path. Returns null if none could be found.
/// </summary>
public Task<ISagaData> Find(Type sagaDataType, string propertyName, object propertyValue)
{
ISagaData sagaData = null;
try
{
//Some implementation
}
catch (Exception ex)
{
_log.Error("Saga Data Exception " + ex.Message);
}
return Task.FromResult<ISagaData>(sagaData);
}
}
}
和如下所示的 ISubscriptionStorage,
using System;
using System.Linq;
using Rebus.Subscriptions;
using Microsoft.Win32;
using System.Threading.Tasks;
using log4net;
namespace Rebus.SagaExtension
{
public class NDbSubscriptionStorage : ISubscriptionStorage
{
static ILog _log;
public bool IsCentralized => true;
static NDbSubscriptionStorage()
{
_log = LogManager.GetLogger(typeof(NDbSubscriptionStorage));
}
public Task<string[]> GetSubscriberAddresses(string topic)
{
string[] subs = null;
//Some implementation
return Task.FromResult<string[]>(subs);
}
public Task UnregisterSubscriber(string topic, string subscriberAddress)
{
return Task.Run(() =>
{
//Some implementation
});
}
public Task RegisterSubscriber(string topic, string subscriberAddress)
{
return Task.Run(() =>
{
//Some implementation
});
}
}
}
现在在配置rebus的时候应该怎么使用它,
public class RebusInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
Configure.With(new CastleWindsorContainerAdapter(container))
.Options(o =>
{
o.SpecifyOrderOfHandlers()
.First<AuthenticationHandler>();
})
.Logging(l => l.Log4Net())
.Transport(t => t.UseMsmq(ConfigurationManager.AppSettings["InputQueue"]))
.Routing(r => r.TypeBasedRoutingFromAppConfig())
.Sagas(m => m.StoreInMySagaDb()) //How?
.Subscriptions(m => m.StoreInMySubscriberDb()) //How?
.Timeouts(m => m.StoreInMemory())
.Events(e =>
{
e.BeforeMessageSent += (bus, headers, message, context) =>
{
var timeToBeReceived = ConfigurationManager.AppSettings["TimeToBeReceived"];
if (string.IsNullOrEmpty(timeToBeReceived))
timeToBeReceived = "00:00:30";
if (timeToBeReceived != "-1")
{
headers.Add(Headers.TimeToBeReceived, timeToBeReceived);
headers.Add(Headers.Express, "rebus-express");
}
};
})
.Start();
}
}
请建议在这种情况下配置 saga 和订阅者所需的步骤。
您可以这样实施 StoreInMySagaDb
:
public static class MyRebusConfigurationExtensions
{
public static void StoreInMySagaDb(this StandardConfigurer<ISagaStorage> configurer)
{
configurer.Register(c => new MySagaStorage());
}
}
可能注入,例如Rebus 当前配置的记录器工厂是这样的:
public static class MyRebusConfigurationExtensions
{
public static void StoreInMySagaDb(this StandardConfigurer<ISagaStorage> configurer)
{
configurer.Register(c => new MySagaStorage(c.Get<IRebusLoggerFactory>()));
}
}
您可以对订阅存储使用相同的方法,只需为 StandardConfigurer<ISubscriptionStorage>
编写一个扩展方法即可。