Akka.NET 的简单 TCP 服务器
Simple TCP server with Akka.NET
您好,我正在学习使用 Akka.net,我想做的是创建一个简单的 TCP 服务器,它会定期向 tcp 连接发送数据。 (然后将由 processingjs 客户端拾取并显示在输出中)
不确定我在这里遗漏了什么。你们中的任何专家都可以阐明这个问题吗?
这些是我的演员:
using Akka.Actor;
using Akka.IO;
using System;
using System.Text;
using System.Threading.Tasks;
namespace ActorTcpProcessing
{
public class ProcessingServer : ReceiveActor
{
public ProcessingServer()
{
Receive<Tcp.Bound>(TcpBoundHandler);
Receive<Tcp.Connected>(TcpConnectedHandler);
}
internal static Props Create()
{
return Props.Create(() => new ProcessingServer());
}
private void TcpBoundHandler(Tcp.Bound bound)
{
Console.WriteLine("Listening on {0}", bound.LocalAddress);
}
private void TcpConnectedHandler(Tcp.Connected connected)
{
var tcpConnection = Sender;
var connectionHandler = Context.ActorOf(SensorReader.CreateReader(tcpConnection));
tcpConnection.Tell(new Tcp.Register(connectionHandler));
}
}
internal class SensorReader : ReceiveActor
{
private readonly IActorRef _tcpConnection;
public SensorReader(IActorRef tcpConnection)
{
_tcpConnection = tcpConnection;
Receive<SensorDataReceived>(TcpWriteSensorData);
}
internal static Props CreateReader(IActorRef connectionHandler)
{
return Props.Create(() => new SensorReader(connectionHandler));
}
private void TcpWriteSensorData(SensorDataReceived receivedData)
{
_tcpConnection.Tell(Tcp.Write.Create(ByteString.FromString(receivedData.SensorData + "\n")));
}
}
}
我的印象是,当我在 Tcp.Connected
消息上注册 SensorReader
时,我需要做的就是向流发送 SensorData
消息。
因此:_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
但是,这似乎不起作用。
我收到以下日志消息:
[INFO][11/09/2021 10:22:05 AM][Thread 0004][akka://tcp-processing/deadLetters] Message [Bound] from [akka://tcp-processing/system/IO-TCP/$a#1070400244] to [akka://tcp-processing/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://tcp-processing/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
这就是我向服务器发送 Tcp.Bind 消息的方式。
using System;
using Akka.IO;
using Akka.Actor;
using System.Net;
using Topshelf;
namespace ActorTcpProcessing
{
class StreamTcpService : ServiceControl
{
ActorSystem _sys;
public bool Start(HostControl hostControl)
{
_sys = ActorSystem.Create("tcp-processing");
var server = _sys.ActorOf(ProcessingServer.Create(), nameof(ProcessingServer));
var tcpManager = _sys.Tcp();
tcpManager.Tell(new Tcp.Bind(
server,
new IPEndPoint(IPAddress.Any, 10002)));
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
return true;
}
public bool Stop(HostControl hostControl)
{
_sys.Terminate().ContinueWith(task =>
{
if (task.IsCompleted && !task.IsCanceled)
Console.WriteLine("Stream Tcp stopped");
});
return true;
}
}
}
如您所见,我将 server
指定为绑定消息的处理程序。还是我完全错了?
此外,我必须提到,当我启动我的 processingjs 客户端时,我确实收到了一条 Tcp.Connected
消息到我的 ProcessingServer
,它又创建了 SensorReader
并注册为处理程序。
我还想知道我是否正确使用 EventStream
发布?
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
非常感谢您对此的任何帮助。
谢谢。
您的问题在行:
tcpManager.Tell(new Tcp.Bind(server,new IPEndPoint(IPAddress.Any, 10002)));
这有点误导,但是 Tcp.Bind
的构造函数中的 server
是 Tcp.Connected
形式的所有传入连接的处理程序,而不是将接收Tcp.Bound
留言。
Tcp
演员将用 Tcp.Bound
回复 Tcp.Bind
消息的 Sender
。要解决您的问题,您应该传递预期接收 Tcp.Bound
消息的 Actor 的 IActorRef
引用,而不是原始发件人。在你的情况下:
tcpManager.Tell(new Tcp.Bind(server, new IPEndPoint(IPAddress.Any, 10002)), server);
您好,我正在学习使用 Akka.net,我想做的是创建一个简单的 TCP 服务器,它会定期向 tcp 连接发送数据。 (然后将由 processingjs 客户端拾取并显示在输出中)
不确定我在这里遗漏了什么。你们中的任何专家都可以阐明这个问题吗?
这些是我的演员:
using Akka.Actor;
using Akka.IO;
using System;
using System.Text;
using System.Threading.Tasks;
namespace ActorTcpProcessing
{
public class ProcessingServer : ReceiveActor
{
public ProcessingServer()
{
Receive<Tcp.Bound>(TcpBoundHandler);
Receive<Tcp.Connected>(TcpConnectedHandler);
}
internal static Props Create()
{
return Props.Create(() => new ProcessingServer());
}
private void TcpBoundHandler(Tcp.Bound bound)
{
Console.WriteLine("Listening on {0}", bound.LocalAddress);
}
private void TcpConnectedHandler(Tcp.Connected connected)
{
var tcpConnection = Sender;
var connectionHandler = Context.ActorOf(SensorReader.CreateReader(tcpConnection));
tcpConnection.Tell(new Tcp.Register(connectionHandler));
}
}
internal class SensorReader : ReceiveActor
{
private readonly IActorRef _tcpConnection;
public SensorReader(IActorRef tcpConnection)
{
_tcpConnection = tcpConnection;
Receive<SensorDataReceived>(TcpWriteSensorData);
}
internal static Props CreateReader(IActorRef connectionHandler)
{
return Props.Create(() => new SensorReader(connectionHandler));
}
private void TcpWriteSensorData(SensorDataReceived receivedData)
{
_tcpConnection.Tell(Tcp.Write.Create(ByteString.FromString(receivedData.SensorData + "\n")));
}
}
}
我的印象是,当我在 Tcp.Connected
消息上注册 SensorReader
时,我需要做的就是向流发送 SensorData
消息。
因此:_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
但是,这似乎不起作用。 我收到以下日志消息:
[INFO][11/09/2021 10:22:05 AM][Thread 0004][akka://tcp-processing/deadLetters] Message [Bound] from [akka://tcp-processing/system/IO-TCP/$a#1070400244] to [akka://tcp-processing/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://tcp-processing/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
这就是我向服务器发送 Tcp.Bind 消息的方式。
using System;
using Akka.IO;
using Akka.Actor;
using System.Net;
using Topshelf;
namespace ActorTcpProcessing
{
class StreamTcpService : ServiceControl
{
ActorSystem _sys;
public bool Start(HostControl hostControl)
{
_sys = ActorSystem.Create("tcp-processing");
var server = _sys.ActorOf(ProcessingServer.Create(), nameof(ProcessingServer));
var tcpManager = _sys.Tcp();
tcpManager.Tell(new Tcp.Bind(
server,
new IPEndPoint(IPAddress.Any, 10002)));
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
return true;
}
public bool Stop(HostControl hostControl)
{
_sys.Terminate().ContinueWith(task =>
{
if (task.IsCompleted && !task.IsCanceled)
Console.WriteLine("Stream Tcp stopped");
});
return true;
}
}
}
如您所见,我将 server
指定为绑定消息的处理程序。还是我完全错了?
此外,我必须提到,当我启动我的 processingjs 客户端时,我确实收到了一条 Tcp.Connected
消息到我的 ProcessingServer
,它又创建了 SensorReader
并注册为处理程序。
我还想知道我是否正确使用 EventStream
发布?
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
非常感谢您对此的任何帮助。 谢谢。
您的问题在行:
tcpManager.Tell(new Tcp.Bind(server,new IPEndPoint(IPAddress.Any, 10002)));
这有点误导,但是 Tcp.Bind
的构造函数中的 server
是 Tcp.Connected
形式的所有传入连接的处理程序,而不是将接收Tcp.Bound
留言。
Tcp
演员将用 Tcp.Bound
回复 Tcp.Bind
消息的 Sender
。要解决您的问题,您应该传递预期接收 Tcp.Bound
消息的 Actor 的 IActorRef
引用,而不是原始发件人。在你的情况下:
tcpManager.Tell(new Tcp.Bind(server, new IPEndPoint(IPAddress.Any, 10002)), server);