MassTransit pub/sub 与 MSMQ 使用
MassTransit pub/sub with MSMQ using
我刚开始使用 MassTransit,找不到适合初学者的任何合适的文档。
我确实在 http://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/ 找到了一些示例代码,其中显示了使用 RabbitMQ
的简单 MassTransit Publish/Subscribe 示例
但对于我的公司,我需要使用 MSMQ。
所以我删除了 RabbitMQ 引用:
x.UseRabbitMq();
x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName);
并将它们改为使用 MSMQ:
x.UseMsmq();
x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);
我在启动订阅者和发布者时都没有错误,我可以在发布者处输入消息,但它们似乎没有到达订阅者,消费代码从未被调用。
配置:
namespace Configuration
{
public class BusInitializer
{
public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
{
Log4NetLogger.Use();
var bus = ServiceBusFactory.New(x =>
{
x.UseMsmq();
x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);
moreInitialization(x);
});
return bus;
}
}
}
出版商:
static void Main(string[] args)
{
var bus = BusInitializer.CreateBus("TestPublisher", x => { });
string text = "";
while (text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage() { What = text, When = DateTime.Now };
bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
}
bus.Dispose();
}
订阅者:
static void Main(string[] args)
{
var bus = BusInitializer.CreateBus("TestSubscriber", x =>
{
x.Subscribe(subs =>
{
subs.Consumer<SomethingHappenedConsumer>().Permanent();
});
});
Console.ReadKey();
bus.Dispose();
}
未被调用的消费者代码:
class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
{
public void Consume(IConsumeContext<SomethingHappened> message)
{
Console.Write("TXT: " + message.Message.What);
Console.Write(" SENT: " + message.Message.When.ToString());
Console.Write(" PROCESSED: " + DateTime.Now.ToString());
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");
}
}
我还以为我可以看到存储在 msmq 中的消息,但专用队列是空的。
我现在正在敲头 2 天,一定是遗漏了一些明显的东西;非常感谢任何帮助。
我的环境:Windows8.1 Prof. with VS 2013 Prof.
缺少的部分是订阅分发 - 这是跟踪哪些消费者注册了哪些消息并路由消息的组件。
正如 docs 所说:
Once a subscription is created on a local bus, this information then
needs to be shared between all the different bus instances in your
application network.
Though the routing data is the same, how this information get to all
of the nodes is different depending on your transport configuration.
正如页面继续解释的那样,对于 MSMQ,您有两个选择:
- MSMQ 多播
- 使用订阅服务(MassTransit.RuntimeServices)
多播是only really meant for development and not for production. Note there are no permanent subscriptions and messages can be lost during startup
订阅服务(又名 MassTransit.RuntimeServices)是 not distributed with NuGet,因此您需要查找二进制文件或从源代码编译。它作为一项 Windows 服务运行,需要一个数据库来跟踪订阅。
我相信 none 当 运行 RabbitMQ 或 Azure 服务总线是即将推出的 3.0 版本中支持的传输(Rabbit MQ 在最新版本 2.x 比 MSMQ,版本 3 放弃了 MSMQ 支持)
我刚开始使用 MassTransit,找不到适合初学者的任何合适的文档。 我确实在 http://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/ 找到了一些示例代码,其中显示了使用 RabbitMQ
的简单 MassTransit Publish/Subscribe 示例但对于我的公司,我需要使用 MSMQ。 所以我删除了 RabbitMQ 引用:
x.UseRabbitMq();
x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName);
并将它们改为使用 MSMQ:
x.UseMsmq();
x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);
我在启动订阅者和发布者时都没有错误,我可以在发布者处输入消息,但它们似乎没有到达订阅者,消费代码从未被调用。
配置:
namespace Configuration
{
public class BusInitializer
{
public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
{
Log4NetLogger.Use();
var bus = ServiceBusFactory.New(x =>
{
x.UseMsmq();
x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);
moreInitialization(x);
});
return bus;
}
}
}
出版商:
static void Main(string[] args)
{
var bus = BusInitializer.CreateBus("TestPublisher", x => { });
string text = "";
while (text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage() { What = text, When = DateTime.Now };
bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
}
bus.Dispose();
}
订阅者:
static void Main(string[] args)
{
var bus = BusInitializer.CreateBus("TestSubscriber", x =>
{
x.Subscribe(subs =>
{
subs.Consumer<SomethingHappenedConsumer>().Permanent();
});
});
Console.ReadKey();
bus.Dispose();
}
未被调用的消费者代码:
class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
{
public void Consume(IConsumeContext<SomethingHappened> message)
{
Console.Write("TXT: " + message.Message.What);
Console.Write(" SENT: " + message.Message.When.ToString());
Console.Write(" PROCESSED: " + DateTime.Now.ToString());
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");
}
}
我还以为我可以看到存储在 msmq 中的消息,但专用队列是空的。
我现在正在敲头 2 天,一定是遗漏了一些明显的东西;非常感谢任何帮助。
我的环境:Windows8.1 Prof. with VS 2013 Prof.
缺少的部分是订阅分发 - 这是跟踪哪些消费者注册了哪些消息并路由消息的组件。
正如 docs 所说:
Once a subscription is created on a local bus, this information then needs to be shared between all the different bus instances in your application network.
Though the routing data is the same, how this information get to all of the nodes is different depending on your transport configuration.
正如页面继续解释的那样,对于 MSMQ,您有两个选择:
- MSMQ 多播
- 使用订阅服务(MassTransit.RuntimeServices)
多播是only really meant for development and not for production. Note there are no permanent subscriptions and messages can be lost during startup
订阅服务(又名 MassTransit.RuntimeServices)是 not distributed with NuGet,因此您需要查找二进制文件或从源代码编译。它作为一项 Windows 服务运行,需要一个数据库来跟踪订阅。
我相信 none 当 运行 RabbitMQ 或 Azure 服务总线是即将推出的 3.0 版本中支持的传输(Rabbit MQ 在最新版本 2.x 比 MSMQ,版本 3 放弃了 MSMQ 支持)