Service Fabric 无状态服务器自定义 UDP 侦听器

Service Fabric Stateless Server Custom UDP Listener

我们正在尝试定义一个 Service Fabric 无状态服务,用于侦听 UDP 数据。

我们正在与 Microsoft 合作,Microsoft 表示它受支持并且我应该为 TCP 进行设置;下面是 ServiceManifest.xml 文件的片段:

<Resources>
    <Endpoints>
      <!-- This endpoint is used by the communication listener to obtain the port on which to 
           listen. Please note that if your service is partitioned, this port is shared with 
           replicas of different partitions that are placed in your code. -->
      <Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" />
    </Endpoints>
</Resources>

服务启动正常,但我无法让服务接收任何 UDP 数据,如果我执行 netstat -a,我看不到任何在 TCP 或 UDP 端口上侦听的内容。

我在网上做了很多研究,但我没有找到太多关于创建自定义 ICommunicationListener 的信息,但我希望其他人能够验证这是否可以通过 SF 实现。

这是 ICommunicationListener 实现:

public UdpCommunicationListener(string serviceEndPoint,
            ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector)
   {
       if (serviceInitializationParameters == null)
       {
           throw new ArgumentNullException(nameof(serviceInitializationParameters));
       }

       var endPoint = serviceInitializationParameters
            .CodePackageActivationContext
            .GetEndpoint(serviceEndPoint ?? "ServiceEndPoint");

       _connector = connector;

        _ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
        _port = endPoint.Port;

        _server = new UdpServer(_ipAddress, _port);

        _server.Open();
    }

    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        _listener = _server.Listen(_connector);

        return Task.FromResult($"udp::{_ipAddress}:{_port}");
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.Abort();

        return Task.FromResult(true);
    }

    public void Abort()
    {
        _listener.Dispose();
        _server?.Close();
    }
}

public class UdpServer
{
    private readonly UdpClient _udpClient;
    private IObservable<UdpReceiveResult> _receiveStream;

    public UdpServer(string ipAddress, int port)
    {
        Id = Guid.NewGuid();

        _udpClient = new UdpClient(ipAddress, port);
    }

    public Guid Id { get; }

    public void Open()
    {
        _receiveStream = _udpClient.ReceiveStream().Publish().RefCount();
    }

    public void Close()
    {
        //TODO: Not sure how to stop the process
    }

    public IDisposable Listen(Action<UdpReceiveResult> process)
    {
        return _receiveStream.Subscribe(async r =>
        {
                process(r);
        });
    }
}

我解决了 UdpServer 组件的缺陷,现在可以在 Service Fabric 服务中托管。

这一行的代码问题:

_udpClient = new UdpClient(ipAddress, port);

这是侦听流量的错误过载,需要:

_udpClient = new UdpClient(port);

我试过了:

_udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port)

但这行不通;作为 Communication listen 中的一行(正如它自己描述的那样),它检索主机 returns 主机名而不是 IP 地址,我认为你可以通过对清单进行一些更改来改变这种行为,但仅端口就足够了现在。

因为只支持协议 http/https 和 tcp。我猜你不能做 udp 协议之类的事情。 UDP 不可靠。我们能够使用 SignalR,但我猜 Udp 不起作用。

编辑:您可以在我的其他 Post 中看到 Udp 现在正在运行。

我将 Udp 作为无状态服务工作。这是代码:

UdpService.cs

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Fabric;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;

namespace UdpService
{
    /// <summary>
    /// An instance of this class is created for each service instance by the Service Fabric runtime.
    /// </summary>
    internal sealed class UdpService : StatelessService
    {
        private UdpCommunicationListener listener;

        public UdpService(StatelessServiceContext context)
            : base(context)
        { }

        protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
        {
            yield return new ServiceInstanceListener(initParams =>
            {
                this.listener = new UdpCommunicationListener();
                this.listener.Initialize(initParams.CodePackageActivationContext);

                return this.listener;
            });
        }
    }
}

UdpCommunicationListener

using System;
using System.Diagnostics;
using System.Fabric;
using System.Fabric.Description;
using System.Globalization;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.ServiceFabric.Services.Communication.Runtime;

namespace UdpService
{
    public class UdpCommunicationListener : ICommunicationListener, IDisposable
    {
        private readonly CancellationTokenSource processRequestsCancellation = new CancellationTokenSource();

        public int Port { get; set; }

        private UdpClient server;

        /// <summary>
        /// Stops the Server Ungracefully
        /// </summary>
        public void Abort()
        {
            this.StopWebServer();
        }

        /// <summary>
        /// Stops the Server Gracefully
        /// </summary>
        /// <param name="cancellationToken">Cancellation Token</param>
        /// <returns>Task for Asynchron usage</returns>
        public Task CloseAsync(CancellationToken cancellationToken)
        {
            this.StopWebServer();

            return Task.FromResult(true);
        }

        /// <summary>
        /// Free Resources
        /// </summary>
        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Initializes Configuration
        /// </summary>
        /// <param name="context">Code Package Activation Context</param>
        public void Initialize(ICodePackageActivationContext context)
        {
            EndpointResourceDescription serviceEndpoint = context.GetEndpoint("ServiceEndpoint");
            this.Port = serviceEndpoint.Port;
        }

        /// <summary>
        /// Starts the Server
        /// </summary>
        /// <param name="cancellationToken">Cancellation Token</param>
        /// <returns>Task for Asynchron usage</returns>
        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            try
            {
                this.server = new UdpClient(this.Port);
            }
            catch (Exception ex)
            {
            }

            ThreadPool.QueueUserWorkItem((state) =>
            {
                this.MessageHandling(this.processRequestsCancellation.Token);
            });

            return Task.FromResult("udp://" + FabricRuntime.GetNodeContext().IPAddressOrFQDN + ":" + this.Port);
        }

        protected void MessageHandling(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, this.Port);
                byte[] receivedBytes = this.server.Receive(ref ipEndPoint);
                this.server.Send(receivedBytes, receivedBytes.Length, ipEndPoint);
                Debug.WriteLine("Received bytes: " + receivedBytes.Length.ToString());
            }
        }

        /// <summary>
        /// Receives the specified endpoint.
        /// </summary>
        /// <param name="endpoint">The endpoint.</param>
        /// <returns></returns>
        public Task<byte[]> Receive(ref IPEndPoint endpoint)
        {
            return Task.FromResult(this.server.Receive(ref endpoint));
        }

        /// <summary>
        /// Free Resources and Stop Server
        /// </summary>
        /// <param name="disposing">Disposing .NET Resources?</param>
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (this.server != null)
                {
                    try
                    {
                        this.server.Close();
                        this.server = null;
                    }
                    catch (Exception ex)
                    {
                        ServiceEventSource.Current.Message(ex.Message);
                    }
                }
            }
        }

        /// <summary>
        /// Stops Server and Free Handles
        /// </summary>
        private void StopWebServer()
        {
            this.processRequestsCancellation.Cancel();
            this.Dispose();
        }
    }
}

最后但并非最不重要的 ServiceManifest.xml

<?xml version="1.0" encoding="utf-8"?>
<ServiceManifest Name="UdpServicePkg"
                 Version="1.0.0"
                 xmlns="http://schemas.microsoft.com/2011/01/fabric"
                 xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <ServiceTypes>
    <!-- This is the name of your ServiceType. 
         This name must match the string used in RegisterServiceType call in Program.cs. -->
    <StatelessServiceType ServiceTypeName="UdpServiceType" />
  </ServiceTypes>

  <!-- Code package is your service executable. -->
  <CodePackage Name="Code" Version="1.0.0">
    <EntryPoint>
      <ExeHost>
        <Program>UdpService.exe</Program>
      </ExeHost>
    </EntryPoint>
  </CodePackage>

  <!-- Config package is the contents of the Config directoy under PackageRoot that contains an 
       independently-updateable and versioned set of custom configuration settings for your service. -->
  <ConfigPackage Name="Config" Version="1.0.0" />

  <Resources>
    <Endpoints>
      <!-- This endpoint is used by the communication listener to obtain the port on which to 
           listen. Please note that if your service is partitioned, this port is shared with 
           replicas of different partitions that are placed in your code. -->
      <Endpoint Name="ServiceEndpoint" Port="5555" />
    </Endpoints>
  </Resources>
</ServiceManifest>