WebSocket异步收发数据(使用NuGet websocket-client)
WebSocket send and receive data asynchronously (using NuGet websocket-client)
我正在努力使用 NuGet 包 websocket-client (https://github.com/Marfusios/websocket-client)
的观察者模式
与 WebSocket 服务器的连接稳定 运行。
每个请求在负载中都有一个请求 ID。客户端发送给服务器,服务器返回ID和真实数据。
在客户端,我需要将每个响应分配给相应的请求。
我以为我可以这样做:
public Task<Data> GetDataAsync()
{
var webSocket = new WebsocketClient(Uri);
await webSocket.Start();
var requestId = Guid.NewGuid();
var tcs = new TaskCompletionSource<Data>();
var disposable = webSocket
.MessageReceived
.Where(message => message.Text.Contains(requestId))
.Subscribe(message=>
{
var data = ParseData(message.Text);
tcs.SetResult(data);
});
return tcs.Task;
}
但它实际上从不跳转到订阅方法。我用错了吗?
你的代码比我需要的复杂得多。 Rx 让我们等待一个 observable 来获取最后产生的值。您可以这样编写代码:
public async Task<Data> GetDataAsync() =>
await
Observable
.Using(
() => new WebsocketClient(Uri),
ws =>
from x in Observable.FromAsync(() => ws.Start())
let requestId = Guid.NewGuid()
from m in ws.MessageReceived
where m.Text.Contains(requestId)
select ParseData(m.Text))
.Take(1)
.Timeout(TimeSpan.FromSeconds(5.0));
我觉得
public Task<Data> GetDataAsync(string request)
{
var requestId = Guid.NewGuid().ToString();
var responseTask = WebSocket
.MessageReceived
.Timeout(TimeSpan.FromSeconds(5))
.FirstOrDefaultAsync(message => message.Text.Contains(requestId));
WebSocket.Send(request);
var responseMessage = await responseTask;
return ParseMessage(responseMessage);
}
是必经之路。我什至更喜欢 SingleOrDefaultAsync
而不是 FirstOrDefaultAsync
因为只有一条消息具有该请求 ID。但这是行不通的。它总是在超时运行。
我正在努力使用 NuGet 包 websocket-client (https://github.com/Marfusios/websocket-client)
的观察者模式与 WebSocket 服务器的连接稳定 运行。
每个请求在负载中都有一个请求 ID。客户端发送给服务器,服务器返回ID和真实数据。
在客户端,我需要将每个响应分配给相应的请求。
我以为我可以这样做:
public Task<Data> GetDataAsync()
{
var webSocket = new WebsocketClient(Uri);
await webSocket.Start();
var requestId = Guid.NewGuid();
var tcs = new TaskCompletionSource<Data>();
var disposable = webSocket
.MessageReceived
.Where(message => message.Text.Contains(requestId))
.Subscribe(message=>
{
var data = ParseData(message.Text);
tcs.SetResult(data);
});
return tcs.Task;
}
但它实际上从不跳转到订阅方法。我用错了吗?
你的代码比我需要的复杂得多。 Rx 让我们等待一个 observable 来获取最后产生的值。您可以这样编写代码:
public async Task<Data> GetDataAsync() =>
await
Observable
.Using(
() => new WebsocketClient(Uri),
ws =>
from x in Observable.FromAsync(() => ws.Start())
let requestId = Guid.NewGuid()
from m in ws.MessageReceived
where m.Text.Contains(requestId)
select ParseData(m.Text))
.Take(1)
.Timeout(TimeSpan.FromSeconds(5.0));
我觉得
public Task<Data> GetDataAsync(string request)
{
var requestId = Guid.NewGuid().ToString();
var responseTask = WebSocket
.MessageReceived
.Timeout(TimeSpan.FromSeconds(5))
.FirstOrDefaultAsync(message => message.Text.Contains(requestId));
WebSocket.Send(request);
var responseMessage = await responseTask;
return ParseMessage(responseMessage);
}
是必经之路。我什至更喜欢 SingleOrDefaultAsync
而不是 FirstOrDefaultAsync
因为只有一条消息具有该请求 ID。但这是行不通的。它总是在超时运行。