在 Azure Service Fabric 上设置 TCP
Setting up TCP on Azure Service Fabric
我需要设置一个有状态的 Service Fabric 应用程序来侦听 TCP 请求,然后将消息弹出到可靠队列。
有很多关于 HTTP 和 WCF 端点的示例,但我找不到任何关于简单 TCP 的示例。
在我的 ServiceManifest.xml
我有这个
<Endpoints>
<!-- This endpoint is used by the communication listener to obtain the port on which to
listen. Please note that if your service is partitioned, this port is shared with
replicas of different partitions that are placed in your code. -->
<Endpoint Name="ServiceEndpoint" />
<!-- This endpoint is used by the replicator for replicating the state of your service.
This endpoint is configured through a ReplicatorSettings config section in the Settings.xml
file under the ConfigPackage. -->
<Endpoint Name="ReplicatorEndpoint" />
<Endpoint Name="tcpEndpoint" Protocol="tcp" Port="10100"/>
</Endpoints>
我有一个实现 ICommunicationListener
的侦听器,称为 TcpCommunicationListener
public class TcpCommunicationListener : ICommunicationListener
{
private readonly ServiceEventSource eventSource;
private readonly ServiceContext serviceContext;
private readonly string endpointName;
private string listeningAddress;
private string hostAddress;
public TcpCommunicationListener(ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
{
if (serviceContext == null)
{
throw new ArgumentNullException(nameof(serviceContext));
}
if (endpointName == null)
{
throw new ArgumentNullException(nameof(endpointName));
}
if (eventSource == null)
{
throw new ArgumentNullException(nameof(eventSource));
}
this.serviceContext = serviceContext;
this.endpointName = endpointName;
this.eventSource = eventSource;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
var protocol = serviceEndpoint.Protocol;
int port = serviceEndpoint.Port;
//StatefulServiceContext statefulServiceContext = this.serviceContext as StatefulServiceContext;
this.hostAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
this.listeningAddress = string.Format(
CultureInfo.InvariantCulture,
"{0}://{1}:{2}",
protocol,
hostAddress,
port
);
try
{
this.eventSource.Message("Starting tcp listener " + this.listeningAddress);
return Task.FromResult(this.hostAddress);
}
catch (Exception ex)
{
this.eventSource.Message("Tcp Listener failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
throw;
}
}
public Task CloseAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public void Abort()
{
throw new NotImplementedException();
}
}
我还有一个 StatefulService
叫 ListenerService
internal sealed class ListenerService : StatefulService
{
public ListenerService(StatefulServiceContext context)
: base(context)
{
}
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
var endpoints = Context.CodePackageActivationContext.GetEndpoints()
.Where(endpoint => endpoint.Protocol == EndpointProtocol.Tcp)
.Select(endpoint => endpoint.Name);
return endpoints.Select(endpoint => new ServiceReplicaListener(
serviceContext => new TcpCommunicationListener(serviceContext, ServiceEventSource.Current, endpoint), endpoint));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
//do i spin up a TcpListener here?
}
}
所以我的问题是如何获取正在接收的消息?
我是否必须在 ListenerService
的 RunAsync
方法中创建一个 TcpListener
?如果是这种情况,那么在 ServiceManifest.xml
?
中指定端点有什么意义?
或
我需要在 TcpCommunicationListener?
的 OpenAsync
方法中做些什么吗
现在您的通信侦听器代码仅在调用 OpenAsync
时发布一个 Uri。您还需要实际开始监听该端点。例如,您当时可以打开 Socket。
您也可以使用 WCF with a NetTcpBinding.
我需要设置一个有状态的 Service Fabric 应用程序来侦听 TCP 请求,然后将消息弹出到可靠队列。
有很多关于 HTTP 和 WCF 端点的示例,但我找不到任何关于简单 TCP 的示例。
在我的 ServiceManifest.xml
我有这个
<Endpoints>
<!-- This endpoint is used by the communication listener to obtain the port on which to
listen. Please note that if your service is partitioned, this port is shared with
replicas of different partitions that are placed in your code. -->
<Endpoint Name="ServiceEndpoint" />
<!-- This endpoint is used by the replicator for replicating the state of your service.
This endpoint is configured through a ReplicatorSettings config section in the Settings.xml
file under the ConfigPackage. -->
<Endpoint Name="ReplicatorEndpoint" />
<Endpoint Name="tcpEndpoint" Protocol="tcp" Port="10100"/>
</Endpoints>
我有一个实现 ICommunicationListener
的侦听器,称为 TcpCommunicationListener
public class TcpCommunicationListener : ICommunicationListener
{
private readonly ServiceEventSource eventSource;
private readonly ServiceContext serviceContext;
private readonly string endpointName;
private string listeningAddress;
private string hostAddress;
public TcpCommunicationListener(ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
{
if (serviceContext == null)
{
throw new ArgumentNullException(nameof(serviceContext));
}
if (endpointName == null)
{
throw new ArgumentNullException(nameof(endpointName));
}
if (eventSource == null)
{
throw new ArgumentNullException(nameof(eventSource));
}
this.serviceContext = serviceContext;
this.endpointName = endpointName;
this.eventSource = eventSource;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
var protocol = serviceEndpoint.Protocol;
int port = serviceEndpoint.Port;
//StatefulServiceContext statefulServiceContext = this.serviceContext as StatefulServiceContext;
this.hostAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
this.listeningAddress = string.Format(
CultureInfo.InvariantCulture,
"{0}://{1}:{2}",
protocol,
hostAddress,
port
);
try
{
this.eventSource.Message("Starting tcp listener " + this.listeningAddress);
return Task.FromResult(this.hostAddress);
}
catch (Exception ex)
{
this.eventSource.Message("Tcp Listener failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
throw;
}
}
public Task CloseAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public void Abort()
{
throw new NotImplementedException();
}
}
我还有一个 StatefulService
叫 ListenerService
internal sealed class ListenerService : StatefulService
{
public ListenerService(StatefulServiceContext context)
: base(context)
{
}
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
var endpoints = Context.CodePackageActivationContext.GetEndpoints()
.Where(endpoint => endpoint.Protocol == EndpointProtocol.Tcp)
.Select(endpoint => endpoint.Name);
return endpoints.Select(endpoint => new ServiceReplicaListener(
serviceContext => new TcpCommunicationListener(serviceContext, ServiceEventSource.Current, endpoint), endpoint));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
//do i spin up a TcpListener here?
}
}
所以我的问题是如何获取正在接收的消息?
我是否必须在 ListenerService
的 RunAsync
方法中创建一个 TcpListener
?如果是这种情况,那么在 ServiceManifest.xml
?
或
我需要在 TcpCommunicationListener?
OpenAsync
方法中做些什么吗
现在您的通信侦听器代码仅在调用 OpenAsync
时发布一个 Uri。您还需要实际开始监听该端点。例如,您当时可以打开 Socket。
您也可以使用 WCF with a NetTcpBinding.