Akka.Net 流和远程处理 (Sink.ActorRefWithAck)
Akka.Net Streams and remoting (Sink.ActorRefWithAck)
我用 Akka.net Streams 使用 Sink.ActorRefWithAck
实现了一个非常简单的实现:订阅者向发布者请求一个大字符串,发布者通过切片发送它。
它在本地 (UT) 上工作得很好,但 不能远程 。我不明白怎么了?具体来说:订阅者能够将请求发送给发布者,发布者用 OnInit
消息进行响应,但是 OnInit.Ack
将永远不会返回给发布者。此 Ack
消息最终成为 死信 :
INFO Akka.Actor.EmptyLocalActorRef - Message Ack from akka.tcp://OutOfProcessTaskProcessing@localhost:12100/user/Streamer_636568240846733287 to akka://OutOfProcessTaskProcessing/user/StreamSupervisor-0/StageActorRef-0 was not delivered. 1 dead letters encountered.
请注意,日志来自目标参与者,因此消息是在正确的进程中处理的。没有明显的路径错误。
看了一下没有处理这个消息的publisher代码,真不知道自己做错了什么:
public static void ReplyWithStreamedString(IUntypedActorContext context, string toStream, int chunkSize = 2000)
{
Source<string, NotUsed> source = Source.From(toStream.SplitBy(chunkSize));
source.To(Sink.ActorRefWithAck<string>(context.Sender, new StreamMessage.OnInit(),
new StreamMessage.OnInit.Ack(),
new StreamMessage.Completed(),
exception => new StreamMessage.Failure(exception.Message)))
.Run(context.System.Materializer());
}
这是订阅者代码:
public static Task<string> AskStreamedString(this ICanTell self, object message, ActorSystem context, TimeSpan? timeout = null)
{
var tcs = new TaskCompletionSource<string>();
if (timeout.HasValue)
{
CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
ct.Token.Register(() => tcs.TrySetCanceled());
}
var props = Props.Create(() => new StreamerActorRef(tcs));
var tempActor = context.ActorOf(props, $"Streamer_{DateTime.Now.Ticks}");
self.Tell(message, tempActor);
return tcs.Task.ContinueWith(task =>
{
context.Stop(tempActor);
if(task.IsCanceled)
throw new OperationCanceledException();
if (task.IsFaulted)
throw task.Exception.GetBaseException();
return task.Result;
});
}
internal class StreamerActorRef : ReceiveActor
{
readonly TaskCompletionSource<string> _tcs;
private readonly StringBuilder _stringBuilder = new StringBuilder();
public StreamerActorRef(TaskCompletionSource<string> tcs)
{
_tcs = tcs;
Ready();
}
private void Ready()
{
ReceiveAny(message =>
{
switch (message)
{
case StreamMessage.OnInit _:
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Completed _:
string result = _stringBuilder.ToString();
_tcs.TrySetResult(result);
break;
case string slice:
_stringBuilder.Append(slice);
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Failure error:
_tcs.TrySetException(new InvalidOperationException(error.Reason));
break;
}
});
}
}
有消息:
public class StreamMessage
{
public class OnInit
{
public class Ack{}
}
public class Completed { }
public class Failure
{
public string Reason { get; }
public Failure(string reason)
{
Reason = reason;
}
}
}
一般来说,使用 actor 引用的源和接收器并未设计为在远程连接上工作 - 它们不包括消息重试,如果某些流控制消息未传入,这可能会导致系统死锁.
您正在寻找的功能称为 StreamRefs(它的工作方式类似于 actor refs,但用于流),并将作为 v1.4 版本的一部分发布(请参阅github pull request 了解更多详情)。
我用 Akka.net Streams 使用 Sink.ActorRefWithAck
实现了一个非常简单的实现:订阅者向发布者请求一个大字符串,发布者通过切片发送它。
它在本地 (UT) 上工作得很好,但 不能远程 。我不明白怎么了?具体来说:订阅者能够将请求发送给发布者,发布者用 OnInit
消息进行响应,但是 OnInit.Ack
将永远不会返回给发布者。此 Ack
消息最终成为 死信 :
INFO Akka.Actor.EmptyLocalActorRef - Message Ack from akka.tcp://OutOfProcessTaskProcessing@localhost:12100/user/Streamer_636568240846733287 to akka://OutOfProcessTaskProcessing/user/StreamSupervisor-0/StageActorRef-0 was not delivered. 1 dead letters encountered.
请注意,日志来自目标参与者,因此消息是在正确的进程中处理的。没有明显的路径错误。
看了一下没有处理这个消息的publisher代码,真不知道自己做错了什么:
public static void ReplyWithStreamedString(IUntypedActorContext context, string toStream, int chunkSize = 2000)
{
Source<string, NotUsed> source = Source.From(toStream.SplitBy(chunkSize));
source.To(Sink.ActorRefWithAck<string>(context.Sender, new StreamMessage.OnInit(),
new StreamMessage.OnInit.Ack(),
new StreamMessage.Completed(),
exception => new StreamMessage.Failure(exception.Message)))
.Run(context.System.Materializer());
}
这是订阅者代码:
public static Task<string> AskStreamedString(this ICanTell self, object message, ActorSystem context, TimeSpan? timeout = null)
{
var tcs = new TaskCompletionSource<string>();
if (timeout.HasValue)
{
CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
ct.Token.Register(() => tcs.TrySetCanceled());
}
var props = Props.Create(() => new StreamerActorRef(tcs));
var tempActor = context.ActorOf(props, $"Streamer_{DateTime.Now.Ticks}");
self.Tell(message, tempActor);
return tcs.Task.ContinueWith(task =>
{
context.Stop(tempActor);
if(task.IsCanceled)
throw new OperationCanceledException();
if (task.IsFaulted)
throw task.Exception.GetBaseException();
return task.Result;
});
}
internal class StreamerActorRef : ReceiveActor
{
readonly TaskCompletionSource<string> _tcs;
private readonly StringBuilder _stringBuilder = new StringBuilder();
public StreamerActorRef(TaskCompletionSource<string> tcs)
{
_tcs = tcs;
Ready();
}
private void Ready()
{
ReceiveAny(message =>
{
switch (message)
{
case StreamMessage.OnInit _:
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Completed _:
string result = _stringBuilder.ToString();
_tcs.TrySetResult(result);
break;
case string slice:
_stringBuilder.Append(slice);
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Failure error:
_tcs.TrySetException(new InvalidOperationException(error.Reason));
break;
}
});
}
}
有消息:
public class StreamMessage
{
public class OnInit
{
public class Ack{}
}
public class Completed { }
public class Failure
{
public string Reason { get; }
public Failure(string reason)
{
Reason = reason;
}
}
}
一般来说,使用 actor 引用的源和接收器并未设计为在远程连接上工作 - 它们不包括消息重试,如果某些流控制消息未传入,这可能会导致系统死锁.
您正在寻找的功能称为 StreamRefs(它的工作方式类似于 actor refs,但用于流),并将作为 v1.4 版本的一部分发布(请参阅github pull request 了解更多详情)。