需要 Azure Service Fabric 上的 ICommunicationListener 的 ZeroMQ 实现
Need a ZeroMQ implementation of an ICommunicationListener on Azure Service Fabric
我正在寻找 ICommunicationListener
的 ZeroMQ 实现,我可以将它与服务结构一起使用到 运行 Azure 上的 ZeroMQ 端点。
我找了几个小时都找不到。有谁知道这个的解决方案?我目前使用 "Service App Fabric / .net core 2.0 stateless service" 模板,
这让我可以覆盖
IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
,
当我有 ZeroMQ 的 ICommunicationListener
实现时,
或覆盖 Task RunAsync(CancellationToken cancellationToken)
,
当我想自己设置套接字时。
我的第一次尝试行不通:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
using (var server = new ResponseSocket("tcp://xx.xx.xx.xx:xxxxx"))
{
while (!cancellationToken.IsCancellationRequested)
{
var message = server.ReceiveFrameBytes();
ServiceEventSource.Current.ServiceMessage(this.Context, "Message {0}",
System.Text.Encoding.UTF8.GetString(message));
}
}
}
以上结果是无法启动的服务。找不到太多记录,除此之外:
"There was an error during CodePackage activation.The service host terminated with exit code:255"
如果 none 存在,您可以通过创建 ICommunicationListener
的实现并从 CreateServiceInstanceListeners
返回它来创建自己的。
使用 OpenAsync
打开频道并开始收听。使用 CloseAsync
停止收听。
看看 this implementation for Service Bus,寻找灵感。
这里是一个 ICommunicationListener
ZeroMQ 实现的粗略示例。此实现将充当 ZeroMQ ResponseSocket
,但可以轻松更改为 RequestSocket
、SubscriberSocket
或您喜欢的任何类型的 NetMQ.Sockets.*
套接字实现。当然,它在实现中需要更多细节,比如在检索消息时不抛出异常,但它应该清楚地说明它是如何完成的。它受到 ICommunicationListener
接口的现有 dotnetcore 实现的极大启发。
public class ZeroMqResponseSocketCommunicationListener : ICommunicationListener, IDisposable
{
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
private readonly ResponseSocket _responseSocket = new ResponseSocket();
private readonly ServiceContext _serviceContext;
private readonly string _endpointName;
public ZeroMqResponseSocketCommunicationListener(ServiceContext serviceContext, string endpointName)
{
if (string.IsNullOrEmpty(endpointName))
throw new ArgumentException("endpointName cannot be null or empty string.");
_serviceContext = serviceContext;
_endpointName = endpointName;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
var address = GetListenerUrl();
if (address == null)
throw new InvalidOperationException("No Url returned from ZeroMqResponseSocketCommunicationListener.GetListenerUrl");
_responseSocket.Bind(address);
ThreadPool.QueueUserWorkItem(state => MessageHandler(_cancellationToken.Token));
return Task.FromResult(address);
}
public Task CloseAsync(CancellationToken cancellationToken)
{
_responseSocket.Close();
return Task.FromResult(true);
}
public void Abort()
{
_responseSocket.Close();
}
private void MessageHandler(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var message = _responseSocket.ReceiveFrameBytes();
if (message != null)
throw new Exception($"Message {Encoding.UTF8.GetString(message)}");
}
}
private string GetListenerUrl()
{
var endpoints = _serviceContext.CodePackageActivationContext.GetEndpoints();
if (!endpoints.Contains(_endpointName))
throw new InvalidOperationException($"{_endpointName} not found in Service Manifest.");
var serviceEndpoint = _serviceContext.CodePackageActivationContext.GetEndpoint(_endpointName);
if (string.IsNullOrEmpty(serviceEndpoint.IpAddressOrFqdn))
throw new InvalidOperationException("IpAddressOrFqdn not set on endpoint");
if (serviceEndpoint.Port <= 0)
throw new InvalidOperationException("Port not set on endpoint");
var listenUrl = $"{serviceEndpoint.Protocol.ToString().ToLower()}://{serviceEndpoint.IpAddressOrFqdn}:{serviceEndpoint.Port}";
return listenUrl;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing || _responseSocket == null) return;
try
{
_responseSocket.Close();
_responseSocket.Dispose();
}
catch (Exception ex)
{
ServiceEventSource.Current.Message(ex.Message);
}
}
}
并且return您的应用结构服务中的 ZeroMqResponseSocketCommunicationListener:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
yield return new ServiceInstanceListener(listener => new ZeroMqResponseSocketCommunicationListener(listener, "EndpointName"));
}
确保您在服务的 ServiceManifest.xml 中指定了端点:
<Resources>
<Endpoints>
<Endpoint Name="EndpointName" Port="80" Protocol="tcp" />
</Endpoints>
</Resources>
我正在寻找 ICommunicationListener
的 ZeroMQ 实现,我可以将它与服务结构一起使用到 运行 Azure 上的 ZeroMQ 端点。
我找了几个小时都找不到。有谁知道这个的解决方案?我目前使用 "Service App Fabric / .net core 2.0 stateless service" 模板,
这让我可以覆盖
IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
,
当我有 ZeroMQ 的 ICommunicationListener
实现时,
或覆盖 Task RunAsync(CancellationToken cancellationToken)
,
当我想自己设置套接字时。
我的第一次尝试行不通:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
using (var server = new ResponseSocket("tcp://xx.xx.xx.xx:xxxxx"))
{
while (!cancellationToken.IsCancellationRequested)
{
var message = server.ReceiveFrameBytes();
ServiceEventSource.Current.ServiceMessage(this.Context, "Message {0}",
System.Text.Encoding.UTF8.GetString(message));
}
}
}
以上结果是无法启动的服务。找不到太多记录,除此之外:
"There was an error during CodePackage activation.The service host terminated with exit code:255"
如果 none 存在,您可以通过创建 ICommunicationListener
的实现并从 CreateServiceInstanceListeners
返回它来创建自己的。
使用 OpenAsync
打开频道并开始收听。使用 CloseAsync
停止收听。
看看 this implementation for Service Bus,寻找灵感。
这里是一个 ICommunicationListener
ZeroMQ 实现的粗略示例。此实现将充当 ZeroMQ ResponseSocket
,但可以轻松更改为 RequestSocket
、SubscriberSocket
或您喜欢的任何类型的 NetMQ.Sockets.*
套接字实现。当然,它在实现中需要更多细节,比如在检索消息时不抛出异常,但它应该清楚地说明它是如何完成的。它受到 ICommunicationListener
接口的现有 dotnetcore 实现的极大启发。
public class ZeroMqResponseSocketCommunicationListener : ICommunicationListener, IDisposable
{
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
private readonly ResponseSocket _responseSocket = new ResponseSocket();
private readonly ServiceContext _serviceContext;
private readonly string _endpointName;
public ZeroMqResponseSocketCommunicationListener(ServiceContext serviceContext, string endpointName)
{
if (string.IsNullOrEmpty(endpointName))
throw new ArgumentException("endpointName cannot be null or empty string.");
_serviceContext = serviceContext;
_endpointName = endpointName;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
var address = GetListenerUrl();
if (address == null)
throw new InvalidOperationException("No Url returned from ZeroMqResponseSocketCommunicationListener.GetListenerUrl");
_responseSocket.Bind(address);
ThreadPool.QueueUserWorkItem(state => MessageHandler(_cancellationToken.Token));
return Task.FromResult(address);
}
public Task CloseAsync(CancellationToken cancellationToken)
{
_responseSocket.Close();
return Task.FromResult(true);
}
public void Abort()
{
_responseSocket.Close();
}
private void MessageHandler(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var message = _responseSocket.ReceiveFrameBytes();
if (message != null)
throw new Exception($"Message {Encoding.UTF8.GetString(message)}");
}
}
private string GetListenerUrl()
{
var endpoints = _serviceContext.CodePackageActivationContext.GetEndpoints();
if (!endpoints.Contains(_endpointName))
throw new InvalidOperationException($"{_endpointName} not found in Service Manifest.");
var serviceEndpoint = _serviceContext.CodePackageActivationContext.GetEndpoint(_endpointName);
if (string.IsNullOrEmpty(serviceEndpoint.IpAddressOrFqdn))
throw new InvalidOperationException("IpAddressOrFqdn not set on endpoint");
if (serviceEndpoint.Port <= 0)
throw new InvalidOperationException("Port not set on endpoint");
var listenUrl = $"{serviceEndpoint.Protocol.ToString().ToLower()}://{serviceEndpoint.IpAddressOrFqdn}:{serviceEndpoint.Port}";
return listenUrl;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing || _responseSocket == null) return;
try
{
_responseSocket.Close();
_responseSocket.Dispose();
}
catch (Exception ex)
{
ServiceEventSource.Current.Message(ex.Message);
}
}
}
并且return您的应用结构服务中的 ZeroMqResponseSocketCommunicationListener:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
yield return new ServiceInstanceListener(listener => new ZeroMqResponseSocketCommunicationListener(listener, "EndpointName"));
}
确保您在服务的 ServiceManifest.xml 中指定了端点:
<Resources>
<Endpoints>
<Endpoint Name="EndpointName" Port="80" Protocol="tcp" />
</Endpoints>
</Resources>