Akka.net 在 Azure WebJob 中使用时询问超时
Akka.net Ask timeout when used in Azure WebJob
在工作中,我们在使用 Rabbit 的 Azure WebJob 中有一些代码
基本的工作流程是这样的
- 一条消息到达 RabbitMQ 队列
- 我们有一个传入消息的消息处理程序
- 在消息处理程序中,我们启动了一个顶级(用户)主管角色,我们 "ask" 它用来处理消息
supervisor actor层次结构是这样的
相关的顶级代码是这样的(这是 WebJob 代码)
static void Main(string[] args)
{
try
{
//Bootstrap akka IoC resolver well ahead of any actor usages
new AutoFacDependencyResolver(ContainerOperations.Instance.Container, ContainerOperations.Instance.Container.Resolve<ActorSystem>());
var system = ContainerOperations.Instance.Container.Resolve<ActorSystem>();
var busQueueReader = ContainerOperations.Instance.Container.Resolve<IBusQueueReader>();
var dateTime = ContainerOperations.Instance.Container.Resolve<IDateTime>();
busQueueReader.AddHandler<ProgramCalculationMessage>("RabbitQueue", x =>
{
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
try
{
//SupervisorActor is a singleton
var supervisorActor = ContainerOperations.Instance.Container.ResolveNamed<IActorRef>("SupervisorActor");
var actorMessage = new SomeActorMessage();
var supervisorRunTask = runModelSupervisorActor.Ask(actorMessage, TimeSpan.FromMinutes(25));
//we want to wait this guy out
var supervisorRunResult = supervisorRunTask.GetAwaiter().GetResult();
switch (supervisorRunResult)
{
case CompletedEvent completed:
{
break;
}
case FailedEvent failed:
{
throw failed.Exception;
}
}
}
catch (Exception ex)
{
_log.Error(ex, "Error found in Webjob");
//throw it for the actual RabbitMqQueueReader Handler so message gets NACK
throw;
}
});
Thread.Sleep(Timeout.Infinite);
}
catch (Exception ex)
{
_log.Error(ex, "Error found");
throw;
}
}
这是相关的 IOC 代码(我们使用 Autofac + Akka.NET DI for Autofac)
builder.RegisterType<SupervisorActor>();
_actorSystem = new Lazy<ActorSystem>(() =>
{
var akkaconf = ActorUtil.LoadConfig(_akkaConfigPath).WithFallback(ConfigurationFactory.Default());
return ActorSystem.Create("WebJobSystem", akkaconf);
});
builder.Register<ActorSystem>(cont => _actorSystem.Value);
builder.Register(cont =>
{
var system = cont.Resolve<ActorSystem>();
return system.ActorOf(system.DI().Props<SupervisorActor>(),"SupervisorActor");
})
.SingleInstance()
.Named<IActorRef>("SupervisorActor");
问题
除了上面 WebJob 代码中显示的 Akka.Net "ask" 超时之外,代码工作正常并按照我们的要求执行。
烦人的是,如果我尝试在本地 运行 Web 作业,这似乎工作正常。我可以通过提供一个新的 supervisorActor 来模拟 "ask" 超时,它根本不会响应 "Sender" 的消息。
这在我的机器上 运行 完美运行,但是当我们在 Azure 中 运行 这段代码时,我们没有看到 "ask" 的超时,即使我们的工作流程之一运行s 超过 "ask" 超时一英里。
我只是不知道是什么导致了这种行为,有人有什么想法吗?
我需要为 WebJob 设置一些特定于 Azure 的配置值吗?
答案是使用显然出现在 C# rabbit 客户端 V5.0 中的异步 rabbit 处理程序。官方文档仍然显示同步用法(很遗憾)。
这篇文章很不错:https://gigi.nullneuron.net/gigilabs/asynchronous-rabbitmq-consumers-in-net/
一旦我们这样做了,一切都很好
在工作中,我们在使用 Rabbit 的 Azure WebJob 中有一些代码
基本的工作流程是这样的
- 一条消息到达 RabbitMQ 队列
- 我们有一个传入消息的消息处理程序
- 在消息处理程序中,我们启动了一个顶级(用户)主管角色,我们 "ask" 它用来处理消息
supervisor actor层次结构是这样的
相关的顶级代码是这样的(这是 WebJob 代码)
static void Main(string[] args)
{
try
{
//Bootstrap akka IoC resolver well ahead of any actor usages
new AutoFacDependencyResolver(ContainerOperations.Instance.Container, ContainerOperations.Instance.Container.Resolve<ActorSystem>());
var system = ContainerOperations.Instance.Container.Resolve<ActorSystem>();
var busQueueReader = ContainerOperations.Instance.Container.Resolve<IBusQueueReader>();
var dateTime = ContainerOperations.Instance.Container.Resolve<IDateTime>();
busQueueReader.AddHandler<ProgramCalculationMessage>("RabbitQueue", x =>
{
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
//This is code that gets called whenever we have a RabbitMQ message arrive
try
{
//SupervisorActor is a singleton
var supervisorActor = ContainerOperations.Instance.Container.ResolveNamed<IActorRef>("SupervisorActor");
var actorMessage = new SomeActorMessage();
var supervisorRunTask = runModelSupervisorActor.Ask(actorMessage, TimeSpan.FromMinutes(25));
//we want to wait this guy out
var supervisorRunResult = supervisorRunTask.GetAwaiter().GetResult();
switch (supervisorRunResult)
{
case CompletedEvent completed:
{
break;
}
case FailedEvent failed:
{
throw failed.Exception;
}
}
}
catch (Exception ex)
{
_log.Error(ex, "Error found in Webjob");
//throw it for the actual RabbitMqQueueReader Handler so message gets NACK
throw;
}
});
Thread.Sleep(Timeout.Infinite);
}
catch (Exception ex)
{
_log.Error(ex, "Error found");
throw;
}
}
这是相关的 IOC 代码(我们使用 Autofac + Akka.NET DI for Autofac)
builder.RegisterType<SupervisorActor>();
_actorSystem = new Lazy<ActorSystem>(() =>
{
var akkaconf = ActorUtil.LoadConfig(_akkaConfigPath).WithFallback(ConfigurationFactory.Default());
return ActorSystem.Create("WebJobSystem", akkaconf);
});
builder.Register<ActorSystem>(cont => _actorSystem.Value);
builder.Register(cont =>
{
var system = cont.Resolve<ActorSystem>();
return system.ActorOf(system.DI().Props<SupervisorActor>(),"SupervisorActor");
})
.SingleInstance()
.Named<IActorRef>("SupervisorActor");
问题
除了上面 WebJob 代码中显示的 Akka.Net "ask" 超时之外,代码工作正常并按照我们的要求执行。
烦人的是,如果我尝试在本地 运行 Web 作业,这似乎工作正常。我可以通过提供一个新的 supervisorActor 来模拟 "ask" 超时,它根本不会响应 "Sender" 的消息。
这在我的机器上 运行 完美运行,但是当我们在 Azure 中 运行 这段代码时,我们没有看到 "ask" 的超时,即使我们的工作流程之一运行s 超过 "ask" 超时一英里。
我只是不知道是什么导致了这种行为,有人有什么想法吗?
我需要为 WebJob 设置一些特定于 Azure 的配置值吗?
答案是使用显然出现在 C# rabbit 客户端 V5.0 中的异步 rabbit 处理程序。官方文档仍然显示同步用法(很遗憾)。
这篇文章很不错:https://gigi.nullneuron.net/gigilabs/asynchronous-rabbitmq-consumers-in-net/
一旦我们这样做了,一切都很好