Rebus / 基于内容的路由

Rebus / Content based routing

如您所见,有一个总部作为根节点,一些分支机构作为子节点。有一条Data类型的消息,我想根据Data对象的内容发布一条消息,例如:

if (data.value == xxxx) publish(data, Br1, Br2)
else if (data.value == yyyy) publish(data, Br3, Br4)
else if (data.value == zzzz) publis(data, Br5, Br6)

这是 pub/sub 模式的某种定制版本。但是我想根据消息的内容将数据类型的消息发布给一些特殊的订阅者。

Rebus 有解决办法吗?

Rebus 中有几个解决方案:)

对于您的场景,我可以看到两种解决方法:1) 使用自定义主题,或 2) 实施真正的 content-based 路由器。

如果有意义,您可以通过使用 Rebus 的主题 API 来处理路由,从而使用主题对这个 pub/sub 场景进行建模。如果您可以说您的每条数据消息都属于某个类别,然后您的订阅者可以订阅该类别,那么这是有道理的。

与 "real" topics-based 排队系统相比,例如RabbitMQ,Rebus中的topicsAPI很粗糙。它不允许使用通配符 (*) 或类似的任何高级内容 – 主题只是您可以订阅的简单字符串,然后用作 pub/sub 频道以将事件路由到多个订阅者。

你可以在订阅端这样使用它:

await bus.Advanced.Topics.Subscribe("department_a");

然后在发布者端:

var data = new Data(...);

await bus.Advanced.Topics.Publish("department_a", data);

如果这还不行,您可以插入一个 "real" content-based 路由器,它只是您 await bus.Send(eachDataMessage) 的一个端点,它又将消息转发到相关订阅者。

根据您的要求,可以使用 Rebus 在两个级别上完成。如果只看消息的 headers 就足够了,你应该将其实现为 "transport message forwarder",因为这样会跳过反序列化并提供一个很好的 API 来简单地转发消息:

Configure.With(...)
    .Transport(t => t.UseMsmq("router"))
    .Routing(r => {
        r.AddTransportMessageForwarder(async transportMessage => {
            var headers = transportMessage.Headers;

            var subscribers = Decide(headers);

            return ForwardAction.ForwardTo(subscribers);
        });
    })
    .Start();

如果你需要查看实际的消息,你应该只实现一个普通的消息处理程序,然后使用总线转发消息:

public class Router : IHandleMessages<Data>
{
    readonly IBus _bus;

    public Router(IBus bus)
    {
        _bus = bus;
    }   

    public async Task Handle(Data message)
    {
        var subscribers = Decide(message);

        foreach(var subscriber in subscribers)
        {
            await _bus.Advanced.TransportMessage.ForwardTo(subscriber);
        }
    }
}

custom-implemented 路由器是最灵活的解决方案,因为您可以实现您喜欢的任何逻辑,但如您所见,它稍微复杂一些。


(*) Rebus 通常不允许使用通配符,尽管它 确实 将主题直接传递给 RabbitMQ,如果您恰好将其用作传输,意味着您实际上可以充分利用 RabbitMQ(有关更多信息,请参阅 this issue

    static void Main()
    {

        using (var activator = new BuiltinHandlerActivator())
        {
            activator.Handle<Packet>(async (bus, packet) =>
            {
                string subscriber = "subscriberA";
                await bus.Advanced.TransportMessage.Forward(subscriber); 
            });

            Configure.With(activator)
                .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
                .Transport(t => t.UseMsmq("router"))
                .Start();

            for (int i = 0; i < 10; i++)
            {
                activator.Bus.SendLocal(
                    new Packet()
                    {
                        ID = i,
                        Content = "content" + i.ToString(),
                        Sent = false,
                    }).Wait();
            }
        }

        Console.ReadLine();
    }
using (var trScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
   scope.EnlistRebus();
   Packet packet = ReadFromDB()
   activator.Bus.SendLocal(packet).Wait()
   scope.Complete()
}


activator.Handle<Packet>(async (bus, packet) =>
{
   string subscriber = "subscriberA";
   await bus.Advanced.TransportMessage.Forward(subscriber); 
});
        using (var activator = new BuiltinHandlerActivator())
        {
            activator.Handle<Packet>(async message =>
            {
                string connectionString =
                    "Data Source=.;Initial Catalog=Rebus;User ID=sa;Password=123456";

                using (SqlConnection connection = new SqlConnection(connectionString))
                {
                    string queryString = @"INSERT INTO CLIENTPACKET(ID, CONTENT, SENT) VALUES(@id, @content, @sent)";
                    connection.Open();

                    using (SqlCommand command = new SqlCommand(queryString, connection))
                    {
                        command.Parameters.Add(new SqlParameter("@id", message.ID));
                        command.Parameters.Add(new SqlParameter("@content", message.Content));
                        command.Parameters.Add(new SqlParameter("@sent", message.Sent));

                        await command.ExecuteNonQueryAsync();
                    }
                }
            });


            Configure.With(activator)
                .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
                .Transport(t => t.UseMsmq(@"subscriberA"))
                .Routing(r => r.TypeBased().MapAssemblyOf<Packet>("router"))
                .Options(o =>
                {
                    TransactionOptions tranOp = new TransactionOptions();
                    tranOp.IsolationLevel = IsolationLevel.ReadCommitted;
                    o.HandleMessagesInsideTransactionScope(tranOp);

                    o.SetNumberOfWorkers(2);
                    o.SetMaxParallelism(2);
                })
                .Start();

            activator.Bus.Subscribe<Packet>().Wait();

            Console.WriteLine("Press ENTER to quit");
            Console.ReadLine();
        }