IObservable 与 NetMQ 接收
IObservable with NetMQ receive
我正在尝试编写一个典型的股票交易程序,它从 netmq 接收股票 tickers/orders/trades,将流转换为 IObservable,并在 WPF 前端显示它们。我尝试将 async/await 与 NetMQ 一起使用来阻止 ReceiveString(假设我期待一些字符串输入),以便 ReceiveString 循环不会阻止主(UI)线程。因为我还是 C# 的新手,所以我在这个 post: (https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where-is-iasyncenumerable-in-the-lastest-release?forum=rx) 中接受了 Dave Sexton 的回答,并尝试写一些这样的例子:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive;
using System.Reactive.Linq;
namespace App1
{
class MainClass
{
// publisher for testing, should be an external data publisher in real environment
public static Thread StartPublisher(PublisherSocket s)
{
s.Bind("inproc://test");
var thr = new Thread(() => {
Console.WriteLine("Start publishing...");
while (true) {
Thread.Sleep(500);
s.Send("hello");
}
});
thr.Start();
return thr;
}
public static IObservable<string> Receive(SubscriberSocket s)
{
s.Connect("inproc://test");
s.Subscribe("");
return Observable.Create<string>(
async observer =>
{
while (true)
{
var result = await s.ReceiveString();
observer.OnNext(result);
}
});
}
public static void Main(string[] args)
{
var ctx = NetMQContext.Create();
var sub = ctx.CreateSubscriberSocket();
var pub = ctx.CreatePublisherSocket();
StartPublisher(pub);
Receive(sub).Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
}
使用"cannot await string"编译失败。虽然我知道它可能需要一个任务,但我不太清楚如何完成整个任务。
再次总结:我想要实现的只是使用简单的阻塞 api 从 netmq 获取 ticker/orders/trades 的 IObservable 流,但不会真正阻塞主线程。
我能用它做什么?非常感谢。
ReceiveString 不可等待,所以只需删除 await。
您还可以在这里阅读如何制作可等待的套接字:
http://somdoron.com/2014/08/netmq-asp-net/
并查看以下有关将 NetMQ 与 RX 结合使用的文章:
http://www.codeproject.com/Articles/853841/NetMQ-plus-RX-Streaming-Data-Demo-App
我不熟悉 NetMQ,但你真的应该像这样构造你的 observable:
public static IObservable<string> Receive(NetMQContext ctx)
{
return Observable
.Create<string>(o =>
Observable.Using<string, SubscriberSocket>(() =>
{
var sub = ctx.CreateSubscriberSocket();
sub.Connect("inproc://test");
sub.Subscribe("");
return sub;
}, sub =>
Observable
.FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
h => sub.ReceiveReady += h,
h => sub.ReceiveReady -= h)
.Select(x => sub.ReceiveString()))
.Subscribe(o));
}
这将自动为您创建一个 SubscriberSocket
,当 observable 结束时,将在您的套接字上自动调用 .Dispose()
。
就像我说的,我不熟悉 NetMQ,所以上面的代码没有收到任何发布的消息,所以你需要 fiddle 让它工作,但是这个是一个很好的起点。
我正在尝试编写一个典型的股票交易程序,它从 netmq 接收股票 tickers/orders/trades,将流转换为 IObservable,并在 WPF 前端显示它们。我尝试将 async/await 与 NetMQ 一起使用来阻止 ReceiveString(假设我期待一些字符串输入),以便 ReceiveString 循环不会阻止主(UI)线程。因为我还是 C# 的新手,所以我在这个 post: (https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where-is-iasyncenumerable-in-the-lastest-release?forum=rx) 中接受了 Dave Sexton 的回答,并尝试写一些这样的例子:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive;
using System.Reactive.Linq;
namespace App1
{
class MainClass
{
// publisher for testing, should be an external data publisher in real environment
public static Thread StartPublisher(PublisherSocket s)
{
s.Bind("inproc://test");
var thr = new Thread(() => {
Console.WriteLine("Start publishing...");
while (true) {
Thread.Sleep(500);
s.Send("hello");
}
});
thr.Start();
return thr;
}
public static IObservable<string> Receive(SubscriberSocket s)
{
s.Connect("inproc://test");
s.Subscribe("");
return Observable.Create<string>(
async observer =>
{
while (true)
{
var result = await s.ReceiveString();
observer.OnNext(result);
}
});
}
public static void Main(string[] args)
{
var ctx = NetMQContext.Create();
var sub = ctx.CreateSubscriberSocket();
var pub = ctx.CreatePublisherSocket();
StartPublisher(pub);
Receive(sub).Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
}
使用"cannot await string"编译失败。虽然我知道它可能需要一个任务,但我不太清楚如何完成整个任务。
再次总结:我想要实现的只是使用简单的阻塞 api 从 netmq 获取 ticker/orders/trades 的 IObservable 流,但不会真正阻塞主线程。
我能用它做什么?非常感谢。
ReceiveString 不可等待,所以只需删除 await。
您还可以在这里阅读如何制作可等待的套接字: http://somdoron.com/2014/08/netmq-asp-net/
并查看以下有关将 NetMQ 与 RX 结合使用的文章:
http://www.codeproject.com/Articles/853841/NetMQ-plus-RX-Streaming-Data-Demo-App
我不熟悉 NetMQ,但你真的应该像这样构造你的 observable:
public static IObservable<string> Receive(NetMQContext ctx)
{
return Observable
.Create<string>(o =>
Observable.Using<string, SubscriberSocket>(() =>
{
var sub = ctx.CreateSubscriberSocket();
sub.Connect("inproc://test");
sub.Subscribe("");
return sub;
}, sub =>
Observable
.FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
h => sub.ReceiveReady += h,
h => sub.ReceiveReady -= h)
.Select(x => sub.ReceiveString()))
.Subscribe(o));
}
这将自动为您创建一个 SubscriberSocket
,当 observable 结束时,将在您的套接字上自动调用 .Dispose()
。
就像我说的,我不熟悉 NetMQ,所以上面的代码没有收到任何发布的消息,所以你需要 fiddle 让它工作,但是这个是一个很好的起点。