MassTransit:在消费者中创建回调队列
MassTransit: Creating a callback queue in consumer
MassTransit 的新手,仍在尝试一些教程项目。我将有一项服务 运行 可能持续 20 分钟,完成后我需要做一些事情。因为它可能需要很长时间,所以我不想遵循 request/response 模式并等待响应,从而阻止线程。我认为我的另一个选择是创建另一个队列,仅供消费者在工作完成时发布。我看过这个 post:, but I'm not sure how to implement this. My projects, again from this 教程,看起来像这样:
发布者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
x.Host(new Uri("rabbitmq://localhost/"), h => {}));
var busHandle = bus.Start();
var text = ""'
Console.WriteLine("Publisher");
while(text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage()
{
What = text,
When = DateTime.Now
}
bus.Publish(message);
}
busHandle.Stop();
}
订阅者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost"), h => {});
x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
e.Consumer<SomethingHappenedConsumer>());
});
Console.WriteLine("Subscriber");
var busHandle = bus.Start();
Console.ReadKey();
busHandle.Stop();
}
消费者:
class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
public Task Consume(ConsumeContext<ISomethingHappened> context)
{
Console.Write("TXT: " + context.Message.What);
Console.Write(" SENT: " + context.Message.When);
Console.Write(" PROCESSED: " + DateTime.Now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
return Task.FromResult(0);
}
}
我将如何着手在消费者中创建回调队列?
在您的消费者中,只需 Bus.Publish(new ResponseMessage());
(或您对响应的任何称呼)并让您的发布者为该消息类型注册一个消费者。您的发布者似乎没有绑定到队列,只需创建一个队列名称并将其也绑定到队列即可。
再次感谢@Travis 的帮助。只是想向未来的任何人展示我最终得到的最终代码。消息的响应看起来很有趣,但它正确地回传给了发布者。
发布者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e =>
e.Consumer<ResponseConsumer>());
});
var busHandle = bus.Start();
var text = "";
Console.WriteLine("Publisher");
while(text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage()
{
What = text,
When = DateTime.Now
};
bus.Publish(message);
}
busHandle.Stop();
}
响应消费者:
class ResponseConsumer : IConsumer<IResponse>
{
public Task Consume(ConsumeContext<IResponse> context)
{
Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message);
return Task.FromResult(0);
}
}
订阅者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
e.Consumer<SomethingHappenedConsumer>());
});
Console.WriteLine("Subscriber");
var busHandle = bus.Start();
Console.ReadKey();
busHandle.Stop();
}
订户消费者:
class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x =>
x.Host(new Uri("rabbitmq://localhost/"), h => { }));
public Task Consume(ConsumeContext<ISomethingHappened> context)
{
var now = DateTime.Now;
Console.Write("TXT: " + context.Message.What);
Console.Write(" SENT: " + context.Message.When);
Console.Write(" PROCESSED: " + now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
var response = new ResponseMessage()
{
Message = "The request was processed at " + now
};
bus.Publish(response);
return Task.FromResult(0);
}
}
MassTransit 的新手,仍在尝试一些教程项目。我将有一项服务 运行 可能持续 20 分钟,完成后我需要做一些事情。因为它可能需要很长时间,所以我不想遵循 request/response 模式并等待响应,从而阻止线程。我认为我的另一个选择是创建另一个队列,仅供消费者在工作完成时发布。我看过这个 post:
发布者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
x.Host(new Uri("rabbitmq://localhost/"), h => {}));
var busHandle = bus.Start();
var text = ""'
Console.WriteLine("Publisher");
while(text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage()
{
What = text,
When = DateTime.Now
}
bus.Publish(message);
}
busHandle.Stop();
}
订阅者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost"), h => {});
x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
e.Consumer<SomethingHappenedConsumer>());
});
Console.WriteLine("Subscriber");
var busHandle = bus.Start();
Console.ReadKey();
busHandle.Stop();
}
消费者:
class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
public Task Consume(ConsumeContext<ISomethingHappened> context)
{
Console.Write("TXT: " + context.Message.What);
Console.Write(" SENT: " + context.Message.When);
Console.Write(" PROCESSED: " + DateTime.Now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
return Task.FromResult(0);
}
}
我将如何着手在消费者中创建回调队列?
在您的消费者中,只需 Bus.Publish(new ResponseMessage());
(或您对响应的任何称呼)并让您的发布者为该消息类型注册一个消费者。您的发布者似乎没有绑定到队列,只需创建一个队列名称并将其也绑定到队列即可。
再次感谢@Travis 的帮助。只是想向未来的任何人展示我最终得到的最终代码。消息的响应看起来很有趣,但它正确地回传给了发布者。
发布者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e =>
e.Consumer<ResponseConsumer>());
});
var busHandle = bus.Start();
var text = "";
Console.WriteLine("Publisher");
while(text != "quit")
{
Console.Write("Enter a message: ");
text = Console.ReadLine();
var message = new SomethingHappenedMessage()
{
What = text,
When = DateTime.Now
};
bus.Publish(message);
}
busHandle.Stop();
}
响应消费者:
class ResponseConsumer : IConsumer<IResponse>
{
public Task Consume(ConsumeContext<IResponse> context)
{
Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message);
return Task.FromResult(0);
}
}
订阅者:
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });
x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
e.Consumer<SomethingHappenedConsumer>());
});
Console.WriteLine("Subscriber");
var busHandle = bus.Start();
Console.ReadKey();
busHandle.Stop();
}
订户消费者:
class SomethingHappenedConsumer : IConsumer<ISomethingHappened>
{
private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x =>
x.Host(new Uri("rabbitmq://localhost/"), h => { }));
public Task Consume(ConsumeContext<ISomethingHappened> context)
{
var now = DateTime.Now;
Console.Write("TXT: " + context.Message.What);
Console.Write(" SENT: " + context.Message.When);
Console.Write(" PROCESSED: " + now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
var response = new ResponseMessage()
{
Message = "The request was processed at " + now
};
bus.Publish(response);
return Task.FromResult(0);
}
}