通过ConsumeContext--Masstransit发布命令处理程序的命令结果
Publish command result of command handler through ConsumeContext --Masstransit
我已经为 masstransit 配置了一个 webapi 控制器。像这样。
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri("rabbitmq://localhost/"), h =>{ });
});
_busControl.Start();
控制台应用程序是我的服务器,它正在侦听我的命令/事件 --
var BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
//h.Username("guest");
//h.Password("guest");
});
cfg.ReceiveEndpoint(host, "create_product_queue", config => config.Consumer<CreateInventoryProductItemHandler>());
cfg.ReceiveEndpoint(host, "ProductItemCreatedEvent", config => config.Handler<ProductItemCreatedEvent>(async context => await Console.Out.WriteLineAsync($"Product Item Id -----{context.Message.ProductItemId} "))); <---- this never gets executed .
});
BusControl.Start();
控制器的动作类似于 -
[HttpPost("product")]
public async Task<IActionResult> Post([FromBody] Product Product)
{
var endpoint = await Startup.Bus.GetSendEndpoint(new Uri("rabbitmq://localhost/create_product_queue"));
await endpoint.Send<CreateInventoryProductItem>(new
{
ProductName = Product.ProductName,
PartNumber = Product.PartNumber,
ManufactureDate = Product.ManufacturedDate
});
return new HttpStatusCodeResult(StatusCodes.Status202Accepted);
}
处理程序是--
public async Task Consume(ConsumeContext<CreateInventoryProductItem> context)
{
CreateInventoryProductItem Command = context.Message;
Product Product = await Context.SaveAsync(new Product(Command.ProductName
, Command.PartNumber
, Command.ManufacturedDate));
await context.Publish<ProductItemCreatedEvent>(Product.Id); <--- problem area
}
现在我的场景是。
创建命令对象。
通过新创建的Entity frameworkContext.SaveAsync()
方法获取产品Id
将 ProductId 发布到总线,以便其他侦听器可以侦听此产品 Id 并据此采取行动。
我正在尝试找出适用于这种情况的最佳模式。
现在问题从 -- await context.Publish<ProductItemCreatedEvent>(Product.Id);
这一行开始。它尝试一次又一次地无限地发布到同一个队列。结果,同一个 Product 对象被插入了数百次(我知道这很蹩脚,因为没有唯一约束,但这不是这里的问题),发生的是我的命令处理程序尽管是强类型的,但它会无限期地执行。如何摆脱这种情况。我对公共交通概念还很陌生。
所以有人可以阐明为什么会发生这种情况。
所以问题是您需要将一个对象传递给您正在使用的 Publish
重载:
await context.Publish<ProductItemCreatedEvent>(Product.Id);
如果 ProductItemCreatedEvent
包含两个属性,Id
和 Description
,您将创建一个匿名对象来初始化接口:
await context.Publish<ProductItemCreatedEvent>(new
{
Id = Product.Id,
Description = Product.Description,
});
这样,传递了一个具有属性的对象,它可以为事件接口启动动态生成的支持class。
我已经为 masstransit 配置了一个 webapi 控制器。像这样。
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri("rabbitmq://localhost/"), h =>{ });
});
_busControl.Start();
控制台应用程序是我的服务器,它正在侦听我的命令/事件 --
var BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
//h.Username("guest");
//h.Password("guest");
});
cfg.ReceiveEndpoint(host, "create_product_queue", config => config.Consumer<CreateInventoryProductItemHandler>());
cfg.ReceiveEndpoint(host, "ProductItemCreatedEvent", config => config.Handler<ProductItemCreatedEvent>(async context => await Console.Out.WriteLineAsync($"Product Item Id -----{context.Message.ProductItemId} "))); <---- this never gets executed .
});
BusControl.Start();
控制器的动作类似于 -
[HttpPost("product")]
public async Task<IActionResult> Post([FromBody] Product Product)
{
var endpoint = await Startup.Bus.GetSendEndpoint(new Uri("rabbitmq://localhost/create_product_queue"));
await endpoint.Send<CreateInventoryProductItem>(new
{
ProductName = Product.ProductName,
PartNumber = Product.PartNumber,
ManufactureDate = Product.ManufacturedDate
});
return new HttpStatusCodeResult(StatusCodes.Status202Accepted);
}
处理程序是--
public async Task Consume(ConsumeContext<CreateInventoryProductItem> context)
{
CreateInventoryProductItem Command = context.Message;
Product Product = await Context.SaveAsync(new Product(Command.ProductName
, Command.PartNumber
, Command.ManufacturedDate));
await context.Publish<ProductItemCreatedEvent>(Product.Id); <--- problem area
}
现在我的场景是。
创建命令对象。
通过新创建的Entity framework
Context.SaveAsync()
方法获取产品Id将 ProductId 发布到总线,以便其他侦听器可以侦听此产品 Id 并据此采取行动。
我正在尝试找出适用于这种情况的最佳模式。
现在问题从 -- await context.Publish<ProductItemCreatedEvent>(Product.Id);
这一行开始。它尝试一次又一次地无限地发布到同一个队列。结果,同一个 Product 对象被插入了数百次(我知道这很蹩脚,因为没有唯一约束,但这不是这里的问题),发生的是我的命令处理程序尽管是强类型的,但它会无限期地执行。如何摆脱这种情况。我对公共交通概念还很陌生。
所以有人可以阐明为什么会发生这种情况。
所以问题是您需要将一个对象传递给您正在使用的 Publish
重载:
await context.Publish<ProductItemCreatedEvent>(Product.Id);
如果 ProductItemCreatedEvent
包含两个属性,Id
和 Description
,您将创建一个匿名对象来初始化接口:
await context.Publish<ProductItemCreatedEvent>(new
{
Id = Product.Id,
Description = Product.Description,
});
这样,传递了一个具有属性的对象,它可以为事件接口启动动态生成的支持class。