使用 Azure 服务总线在 Rebus 中处理 Publish/Subscribe
Handle Publish/Subscribe in Rebus with Azure Service Bus
我正在研究 Rebus 并将其与 Azure 服务总线一起使用。将它与 regalure Queues 一起使用很容易,但是当我想改用 Topic 时,我无法让它工作。
这里有没有已经完成设置并与 Topic/Subscription 一起使用的人?这是我目前所拥有的。
static void Main(string[] args)
{
_bus1 = InitializeBus(System.Environment.MachineName);
_bus2 = InitializeBus(System.Environment.MachineName + "_2");
_bus3 = InitializeBus();
Run();
Console.WriteLine("Press Enter to exit!");
Console.ReadLine();
}
private static void Run()
{
try
{
_bus1.Handle<string>((b, c, m) => { Console.WriteLine(m); return null; });
_bus2.Handle<string>((b, c, m) => { Console.WriteLine(m); return null; });
_bus1.Bus.Subscribe<string>();
_bus2.Bus.Subscribe<string>();
_bus3.Bus.Publish("Publish test message");
}
catch (Exception ex)
{
throw;
}
}
private static BuiltinHandlerActivator InitializeBus(string queueName = null)
{
var activator = new BuiltinHandlerActivator();
if(string.IsNullOrEmpty(queueName))
Configure.With(activator)
.Transport(t => t.UseAzureServiceBusAsOneWayClient(connectionString))
.Options(o => { o.SetNumberOfWorkers(10); o.SetMaxParallelism(10); })
.Start();
else
Configure.With(activator)
.Transport(t => t.UseAzureServiceBus(connectionString, queueName).EnablePartitioning().DoNotCreateQueues())
.Options(o => { o.SetNumberOfWorkers(10); o.SetMaxParallelism(10); })
.Start();
return activator;
}
首先我创建了所有的总线。我正在使用 DontCreateQueues(),因为我不希望在我的根目录中重复创建队列,而只是在作为订阅的主题下创建。
然后我设置了总线并且发布工作正常,创建了一个主题并在该主题下创建了 2 个订阅,并且每个订阅中都有 1 条消息。但是消息永远不会被收集。
如果我删除配置中的 DontCreateQueues() 方法,代码可以工作,但随后会在根目录中创建 2 个队列和主题,这是 2 个订阅,但我不能那样做。
最好的问候
马格努斯
Rebus 通过为您订阅的每个主题创建订阅来使用主题,然后配置订阅以将收到的消息转发到总线的输入队列。
如果总线没有具有预期名称的输入队列(Rebus 创建的或您手动创建的),事情将无法进行。
存在 DontCreateQueues()
的原因是允许专家用户配置他们的队列设置,超出 Rebus 能够(和愿意)做的事情。不过,这需要非常详细地了解 Rebus 如何期望您的队列实体进行布局,所以我建议几乎所有人都不要手动创建任何东西,然后让 Rebus 简单地进行设置。
我正在研究 Rebus 并将其与 Azure 服务总线一起使用。将它与 regalure Queues 一起使用很容易,但是当我想改用 Topic 时,我无法让它工作。
这里有没有已经完成设置并与 Topic/Subscription 一起使用的人?这是我目前所拥有的。
static void Main(string[] args)
{
_bus1 = InitializeBus(System.Environment.MachineName);
_bus2 = InitializeBus(System.Environment.MachineName + "_2");
_bus3 = InitializeBus();
Run();
Console.WriteLine("Press Enter to exit!");
Console.ReadLine();
}
private static void Run()
{
try
{
_bus1.Handle<string>((b, c, m) => { Console.WriteLine(m); return null; });
_bus2.Handle<string>((b, c, m) => { Console.WriteLine(m); return null; });
_bus1.Bus.Subscribe<string>();
_bus2.Bus.Subscribe<string>();
_bus3.Bus.Publish("Publish test message");
}
catch (Exception ex)
{
throw;
}
}
private static BuiltinHandlerActivator InitializeBus(string queueName = null)
{
var activator = new BuiltinHandlerActivator();
if(string.IsNullOrEmpty(queueName))
Configure.With(activator)
.Transport(t => t.UseAzureServiceBusAsOneWayClient(connectionString))
.Options(o => { o.SetNumberOfWorkers(10); o.SetMaxParallelism(10); })
.Start();
else
Configure.With(activator)
.Transport(t => t.UseAzureServiceBus(connectionString, queueName).EnablePartitioning().DoNotCreateQueues())
.Options(o => { o.SetNumberOfWorkers(10); o.SetMaxParallelism(10); })
.Start();
return activator;
}
首先我创建了所有的总线。我正在使用 DontCreateQueues(),因为我不希望在我的根目录中重复创建队列,而只是在作为订阅的主题下创建。 然后我设置了总线并且发布工作正常,创建了一个主题并在该主题下创建了 2 个订阅,并且每个订阅中都有 1 条消息。但是消息永远不会被收集。
如果我删除配置中的 DontCreateQueues() 方法,代码可以工作,但随后会在根目录中创建 2 个队列和主题,这是 2 个订阅,但我不能那样做。
最好的问候 马格努斯
Rebus 通过为您订阅的每个主题创建订阅来使用主题,然后配置订阅以将收到的消息转发到总线的输入队列。
如果总线没有具有预期名称的输入队列(Rebus 创建的或您手动创建的),事情将无法进行。
存在 DontCreateQueues()
的原因是允许专家用户配置他们的队列设置,超出 Rebus 能够(和愿意)做的事情。不过,这需要非常详细地了解 Rebus 如何期望您的队列实体进行布局,所以我建议几乎所有人都不要手动创建任何东西,然后让 Rebus 简单地进行设置。