在 AKKA.NET 参与者的任务中处理来自服务 运行 的事件

Handling Events from a Service running in a Task in an AKKA.NET actor

我正在为我的 WPF 应用程序使用 Prism 框架。我有一个在任务中运行并在找到文件时引发 CompositePresentationEvent 的生产者服务。我有一个订阅该事件的 Akka 演员。 actor 的 handler 看起来很简单,看起来像这样:

private void HandleFileReceive(FileEventArgs args)
{
    Self.Tell(new FileReceived(args.File));
}

当事件触发上述处理程序时,我收到 System.NotSupportedException 消息:"There is no active ActorContext, this is most likely due to use of async operations from within this actor"。

我认为这是因为服务 运行 在与 actor 的处理程序不同的线程中。在 Akka.NET 中有没有办法处理这种类型的事情?

我并不完全反对编写新的 Actors 来完成我的情况所需的服务工作。问题是,根据文件中的某些设置,服务会有所不同。目前,我正在使用 MEF 处理这个问题,并从 IoC 容器中获取给定接口的正确实现者。我想继续保持从核心代码(参与者所在的位置)中抽象出生产者的具体实现。

关于通过这个(感知到的)线程问题的任何建议and/or 动态生成实现给定接口的 ProducerActor?

谢谢

-g

我最终创建了一个 actor 来处理响应检索并更改生产者接口,因此具体的生产者只需要实现一个简单的方法。旧的生产者负责在检查响应之前等待配置驱动的时间量,但新代码利用 AKKA 的调度程序以相同的配置驱动间隔告诉检索器何时检查响应。我在下面提供了一个示例,但我的代码在错误处理等方面稍微复杂一些。

在下面的代码中,您可以看到 IProducer 的接口和具体实现:LocalProducer 和 EmailProducer。在实际代码中,这些具有告诉容器一些其他信息的属性,这些信息用于从容器中获取正确的实现。 ConsumerActor 是此场景的父角色,负责处理 Consume 消息。它在其构造函数中创建一个 ResponseRetrieverActor 并为 ResponseRetrieverActor 安排一个重复的 tell,以便检索器在给定的时间间隔检查响应。

在 ResponseRetrieverActor 的 RetrieveResponses 处理程序中是 IoC 魔法发生的地方。我从工厂方法中获取传输类型(此处未显示——它实际上是在各种配置文件中设置的)。传输类型用于使用上述具体生产者的属性从 IoC 容器中检索正确的生产者。最后,生产者用于获取响应并告知父级(即消费者)列表中的每个文件。

public interface IProducer
{
   List<string> GetResponses();
}

[Export(typeof(IProducer))] // Other attributes needed for MEF Export to differentiate the multiple IProducer implementations
public LocalProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from a directory
   }
}

[Export(typeof(IProducer))]
public EmailProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from email account
   }
}

public class ConsumerActor
{
    public class Consume
    {
       public Consume(string file) { this.File = file; }

       public string File { get; set; }
    }

    public ConsumerActor() 
    {
       _retriever = Context.ActorOf(Props.Create<ResponseRetrieverActor>(), "retriever");

       var interval = 10000;
       Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, interval, _retriever, new ResponseRetrieverActor.RetrieveResponses(), Self);
       Start(); 
    }

    private void Start()
    {
        Receive<Consume>(msg => 
        {
           // do something with the msg.File 
        });
    }

    private IActorRef _retriever;
}
public class ResponseRetrieverActor
{
  public class RetrieveResponses { }

  public ConsumerActor() { Start(); }

  private void Start()
  {
    Receive<RetrieveResponses>(msg => HandleRetrieveResponses());  
  }

  private void HandleRetrieveResponses()
  {
     var transportType = TransportFactory.GetTransportType(); // Gets transport protocol for the producer we need to use (Email, File, ect.)
     var producer = ServiceLocator.Current.GetInstance<IProducer>(transportType); // Gets a producer from the IoC container for the specified transportType

     var responses = producer.GetResponses();

     foreach(var response in responseFiles)
     {
         Context.Parent.Tell(new ConsumerActor.Consume(response));
     }
  }
}

我有一个类似的问题,我想 运行 进程到 运行 来自演员的命令行应用程序。问题是我想获得输出,这是通过处理 process.OutputDataReceived 来完成的。

我最后做的是使用自定义堆栈来放置来自处理程序的消息 process.OutputDataReceived += (sender, e) => Output.Push(e.Data);

自定义堆栈看起来像

class OutputStack<T> : Stack<T>
{
   public event EventHandler OnAdd;
   public void Push(T item)
   {
       base.Push(item);
       if (null != OnAdd) OnAdd(this, null);
   }
} 

然后在构造函数中我只处理自定义堆栈

OnAdd
OutputStack<string> Output = new OutputStack<string>();
Output.OnAdd += (sender, e) =>
{
    if (Output.Count > 0)
    {
        var message = Output.Pop();
        actor.Tell(new LogActor.LogMessage(message));
    }
};

它有点老套,但它确实有效,我可以在它发生时立即发送消息(并得到处理)。希望我能在未来修复它...

不确定这是否解决了 OP 问题,但是当我搜索“没有活动的 ActorContext,这很可能是由于使用了异步”时,我找到了这里,所以我将提供简单的解决方案我的问题在这里:

如果您在某个处理程序中等待某个异步调用,则该处理程序必须是异步的。如果异步处理程序未与 ReceiveAsync<> 而不是 Receive<> 一起使用,则会发生上述异常。 ReceiveAsync<> 在异步处理程序未完成时挂起参与者的邮箱。