使用来自一个 rabbitmq 主机的消息并使用 mass transit 和 .net core 发布到不同的 rabbitmq 主机
Consume a message from one rabbitmq host and publish to a different rabbitmq host using mass transit and .net core
我似乎在使用不同的 rabbitmq 主机发布/使用消息时遇到了问题。最初,发布/订阅发生在一台 rabbitmq 主机上。决定创建另一个主机(位于不同的服务器上),从那以后我 运行 遇到了一些问题。我已经阅读了多总线文档 (https://masstransit-project.com/usage/containers/multibus.html) 并尝试实现它,以便我可以有一个总线用于生成消息和一个总线用于使用消息。请在下面找到代码:
--- Startup.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbit1");
x.AddConsumer<SomeConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransit<ISecondBus>(x =>
{
var section = configuration.GetSection("Rabbit2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.PrefetchCount = 4;
e.UseMessageRetry(r => r.Interval(10, 100));
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- ISecondBus.cs ---
public interface ISecondBus : IBus
{
}
--- Consumer.cs ---
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly IBus _bus;
public SomeConsumer(IBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
await endpoint.Send(newList);
}
}
以上代码运行良好,成功消费了来自rabbit1的消息。但是,当向Rabbit2发送消息时,我可以看到正在创建队列,但是没有数据被持久化到队列中。
见下方第二个消费者:
--- Startup.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbitmq2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- 其他Consumer.cs ---
public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
//Persist data to db
}
}
这里的问题是 rabbit2 的队列永远不会被填充(但它确实被创建)所以没有消息被第二个消费者消费。
如有任何帮助,我们将不胜感激。
如果要从第一条总线发送到第二条总线,需要在第一条总线上的consumer中使用第二条总线接口。
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly ISecondBus _bus;
public SomeConsumer(ISecondBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri("queue:" + QUEUE_NAME));
await endpoint.Send(newList);
}
}
我似乎在使用不同的 rabbitmq 主机发布/使用消息时遇到了问题。最初,发布/订阅发生在一台 rabbitmq 主机上。决定创建另一个主机(位于不同的服务器上),从那以后我 运行 遇到了一些问题。我已经阅读了多总线文档 (https://masstransit-project.com/usage/containers/multibus.html) 并尝试实现它,以便我可以有一个总线用于生成消息和一个总线用于使用消息。请在下面找到代码:
--- Startup.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbit1");
x.AddConsumer<SomeConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransit<ISecondBus>(x =>
{
var section = configuration.GetSection("Rabbit2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.PrefetchCount = 4;
e.UseMessageRetry(r => r.Interval(10, 100));
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- ISecondBus.cs ---
public interface ISecondBus : IBus
{
}
--- Consumer.cs ---
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly IBus _bus;
public SomeConsumer(IBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
await endpoint.Send(newList);
}
}
以上代码运行良好,成功消费了来自rabbit1的消息。但是,当向Rabbit2发送消息时,我可以看到正在创建队列,但是没有数据被持久化到队列中。
见下方第二个消费者:
--- Startup.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbitmq2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- 其他Consumer.cs ---
public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
//Persist data to db
}
}
这里的问题是 rabbit2 的队列永远不会被填充(但它确实被创建)所以没有消息被第二个消费者消费。
如有任何帮助,我们将不胜感激。
如果要从第一条总线发送到第二条总线,需要在第一条总线上的consumer中使用第二条总线接口。
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly ISecondBus _bus;
public SomeConsumer(ISecondBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri("queue:" + QUEUE_NAME));
await endpoint.Send(newList);
}
}