NServiceBus 7 - 如何发送多个连续请求?
NServiceBus 7 - How to send multiple consecutive requests?
我正在尝试使用 NServiceBus 和 RabbitMQ 发出多个连续请求。这是我的代码
class Program
{
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
ConfigureLog();
var ec = new EndpointConfiguration("NServiceBus.RequestResponse");
ec.UsePersistence<InMemoryPersistence>();
ec.EnableInstallers();
ec.EnableCallbacks();
ec.MakeInstanceUniquelyAddressable(Guid.NewGuid().ToString());
ec.LimitMessageProcessingConcurrencyTo(1);
var transport = ec.UseTransport<RabbitMQTransport>();
transport.UseConventionalRoutingTopology();
transport.ConnectionString("host=localhost;username=guest;password=guest");
transport.PrefetchCount(100);
var endpoint = await Endpoint.Start(ec)
.ConfigureAwait(false);
try
{
var options = new SendOptions();
options.SetDestination("NServiceBus.RequestResponse");
Log($"Sending message: 1");
var response1 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello World!!" }, options);
Log($"Response 1 : {response1.Text}");
Log($"Sending message: 2");
var response2 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello from second request!!" }, options);
Log($"Response 1 : {response2.Text}");
}
catch (Exception e)
{
Log("#### ERROR ####");
Log(e.Message);
Log(e.StackTrace);
}
finally
{
await endpoint.Stop()
.ConfigureAwait(false);
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
private static void Log(string msg)
{
LogManager.GetLogger("NServiceBus.RequestResponse").Debug(msg);
Console.WriteLine(msg);
}
private static void ConfigureLog()
{
var layout = new PatternLayout
{
ConversionPattern = "%d [%t] %-5p %c [%x] - %m%n"
};
layout.ActivateOptions();
var consoleAppender = new ConsoleAppender
{
Threshold = Level.All,
Layout = layout
};
consoleAppender.ActivateOptions();
var fileAppender = new FileAppender()
{
Threshold = Level.All,
Layout = layout,
File = "nsb_log.txt"
};
fileAppender.ActivateOptions();
var executingAssembly = Assembly.GetExecutingAssembly();
var repository = log4net.LogManager.GetRepository(executingAssembly);
BasicConfigurator.Configure(repository, consoleAppender, fileAppender);
LogManager.Use<Log4NetFactory>();
}
}
public class MyMessage : IMessage
{
public string Text { get; set; }
}
public class MyMessageHandler : IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
await Console.Out.WriteLineAsync($"Message [{context.MessageId}] received: {message.Text} ");
await context.Reply(new MyResponse() { Text = "Hi There!!" });
}
}
public class MyResponse : IMessage
{
public string Text { get; set; }
}
当我 运行 程序时,第二次请求尝试出现以下异常:
2019-02-25 11:03:22,800 [9] DEBUG NServiceBus.RequestResponse [(null)] - Already specified reply routing option for this message: RouteReplyToThisInstance
2019-02-25 11:03:22,811 [9] DEBUG NServiceBus.RequestResponse [(null)] - at NServiceBus.ApplyReplyToAddressBehavior.State.set_Option(RouteOption value)
at NServiceBus.RoutingOptionExtensions.RouteReplyToThisInstance(SendOptions options)
at NServiceBus.RequestResponseExtensions.<Request>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at NServiceBus.RequestResponse.Program.<MainAsync>d__1.MoveNext() in C:\Users\cruzedu\source\repos\NServiceBus.RequestResponse\NServiceBus.RequestResponse\Program.cs:line 47
我知道我可以更改代码以调用 Send 方法并有一个响应处理程序,但我仍然不明白为什么它不能处理请求。我需要做什么才能让它工作?
我试过设置消息 ID,在第二次调用时不传递发送选项等。似乎没有任何效果。我认为这可能是一个错误。
我创建了一个示例项目来重现该行为并在 GitHub 上分享了它。这是 link. It is important to point out that it requires a RabbitMQ instance. In my case, I had it set up locally (http://localhost:15672) 和默认访客 user/password.
您每次发送消息时都需要初始化一个新的 SendOptions 副本,因为您不能重复使用以前的副本。
var options = new SendOptions();
options.SetDestination("NServiceBus.RequestResponse");
var response1 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello World!!" }, options);
options = new SendOptions();
options.SetDestination("NServiceBus.RequestResponse");
var response2 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello from second request!!" }, options);
我正在尝试使用 NServiceBus 和 RabbitMQ 发出多个连续请求。这是我的代码
class Program
{
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
ConfigureLog();
var ec = new EndpointConfiguration("NServiceBus.RequestResponse");
ec.UsePersistence<InMemoryPersistence>();
ec.EnableInstallers();
ec.EnableCallbacks();
ec.MakeInstanceUniquelyAddressable(Guid.NewGuid().ToString());
ec.LimitMessageProcessingConcurrencyTo(1);
var transport = ec.UseTransport<RabbitMQTransport>();
transport.UseConventionalRoutingTopology();
transport.ConnectionString("host=localhost;username=guest;password=guest");
transport.PrefetchCount(100);
var endpoint = await Endpoint.Start(ec)
.ConfigureAwait(false);
try
{
var options = new SendOptions();
options.SetDestination("NServiceBus.RequestResponse");
Log($"Sending message: 1");
var response1 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello World!!" }, options);
Log($"Response 1 : {response1.Text}");
Log($"Sending message: 2");
var response2 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello from second request!!" }, options);
Log($"Response 1 : {response2.Text}");
}
catch (Exception e)
{
Log("#### ERROR ####");
Log(e.Message);
Log(e.StackTrace);
}
finally
{
await endpoint.Stop()
.ConfigureAwait(false);
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
private static void Log(string msg)
{
LogManager.GetLogger("NServiceBus.RequestResponse").Debug(msg);
Console.WriteLine(msg);
}
private static void ConfigureLog()
{
var layout = new PatternLayout
{
ConversionPattern = "%d [%t] %-5p %c [%x] - %m%n"
};
layout.ActivateOptions();
var consoleAppender = new ConsoleAppender
{
Threshold = Level.All,
Layout = layout
};
consoleAppender.ActivateOptions();
var fileAppender = new FileAppender()
{
Threshold = Level.All,
Layout = layout,
File = "nsb_log.txt"
};
fileAppender.ActivateOptions();
var executingAssembly = Assembly.GetExecutingAssembly();
var repository = log4net.LogManager.GetRepository(executingAssembly);
BasicConfigurator.Configure(repository, consoleAppender, fileAppender);
LogManager.Use<Log4NetFactory>();
}
}
public class MyMessage : IMessage
{
public string Text { get; set; }
}
public class MyMessageHandler : IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
await Console.Out.WriteLineAsync($"Message [{context.MessageId}] received: {message.Text} ");
await context.Reply(new MyResponse() { Text = "Hi There!!" });
}
}
public class MyResponse : IMessage
{
public string Text { get; set; }
}
当我 运行 程序时,第二次请求尝试出现以下异常:
2019-02-25 11:03:22,800 [9] DEBUG NServiceBus.RequestResponse [(null)] - Already specified reply routing option for this message: RouteReplyToThisInstance
2019-02-25 11:03:22,811 [9] DEBUG NServiceBus.RequestResponse [(null)] - at NServiceBus.ApplyReplyToAddressBehavior.State.set_Option(RouteOption value)
at NServiceBus.RoutingOptionExtensions.RouteReplyToThisInstance(SendOptions options)
at NServiceBus.RequestResponseExtensions.<Request>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at NServiceBus.RequestResponse.Program.<MainAsync>d__1.MoveNext() in C:\Users\cruzedu\source\repos\NServiceBus.RequestResponse\NServiceBus.RequestResponse\Program.cs:line 47
我知道我可以更改代码以调用 Send 方法并有一个响应处理程序,但我仍然不明白为什么它不能处理请求。我需要做什么才能让它工作?
我试过设置消息 ID,在第二次调用时不传递发送选项等。似乎没有任何效果。我认为这可能是一个错误。
我创建了一个示例项目来重现该行为并在 GitHub 上分享了它。这是 link. It is important to point out that it requires a RabbitMQ instance. In my case, I had it set up locally (http://localhost:15672) 和默认访客 user/password.
您每次发送消息时都需要初始化一个新的 SendOptions 副本,因为您不能重复使用以前的副本。
var options = new SendOptions();
options.SetDestination("NServiceBus.RequestResponse");
var response1 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello World!!" }, options);
options = new SendOptions();
options.SetDestination("NServiceBus.RequestResponse");
var response2 = await endpoint.Request<MyResponse>(new MyMessage() { Text = "Hello from second request!!" }, options);