.Net Task Parallel Library 数据流如何通过不止一个缓存向多个客户端发送消息?
how .Net Task Parallel Library dataflow send message to many clients with not only one cache?
有一个服务器任务,它使用TPL Dataflow
向许多客户端任务发送消息。
- 客户端随机连接服务器
- 客户端可以向服务器发送消息,客户端可以从服务器接收消息
服务器使用BufferBlock<string>
向客户端发送消息,当客户端连接到服务器时,它从BufferBlock<string>
接收消息。
但是这个BufferBlock<string>
只能缓存一条消息,客户端不能向服务器请求多条消息,客户端不能设置条件select接收哪条消息.
我想要块类型,可以缓存多条消息;并且客户端不仅可以从该块类型中读取一条消息,而且客户端可以选择接收哪条消息;
我尝试了其他TPL Dataflow
方块类型,但没有人有这样的能力,TPL Dataflow
方块是否不适合这样的要求?
条件很简单,每条消息都有一个时间戳,客户端只需发送一个时间戳到服务器,然后服务器在时间戳之后发送 return 条消息。
using System;
using System.Web;
using System.Net;
using System.Threading.Tasks;
using System.Text;
using SimpleJSON;
using System.Collections.Generic;
using System.Threading.Tasks.Dataflow;
namespace TestHttp
{
public class HttpServer
{
private HttpListener httpListener;
public Task task;
public HttpServer()
{
var ta = Task.Factory.StartNew(RunHttp);
task = ta.Result;
}
private async Task RunHttp()
{
var httpPort = 9090;
httpListener = new HttpListener();
httpListener.Prefixes.Add("http://*:"+httpPort+"/");
httpListener.Start();
while (httpListener.IsListening)
{
var context = await httpListener.GetContextAsync();
var req = context.Request;
Handle(context, req);
}
httpListener.Stop();
httpListener.Close();
}
private async Task Handle(HttpListenerContext context, HttpListenerRequest req)
{
Console.WriteLine(req.RawUrl);
var resp = await HandleGet(req);
var buf = Encoding.UTF8.GetBytes(resp);
context.Response.AddHeader("Content-Encoding", "utf-8");
context.Response.ContentEncoding = Encoding.UTF8;
context.Response.ContentLength64 = buf.Length;
try
{
context.Response.OutputStream.Write(buf, 0, buf.Length);
}
catch (Exception exp)
{
Console.WriteLine(exp.ToString());
}
finally
{
context.Response.OutputStream.Close();
}
}
private BufferBlock<string> messages = new BufferBlock<string>();
private async Task<string> HandleGet(HttpListenerRequest req)
{
var r = req.RawUrl.Split('?');
if (r[0] == "/send")
{
await messages.SendAsync(r[1]);
return "Suc";
}
else if(r[0] == "/receive"){
var timestamp = Convert.ToInt32(r[1]);
var ret = await messages.ReceiveAsync();
return ret;
}
//Console.WriteLine(r[0]);
return "Error";
}
}
}
为什么要说 BufferBlock
can contain only one value? It's not true, it can contain as many messages as you want to, based on the block creation options and, specifically, BoundedCapacity
选项。此参数的默认值为-1
,代表无限容量。
因此,在客户端连接的那一刻,您可以轻松获取所有按时间戳过滤的消息,并将它们返回给客户端。这可能会导致更改客户端请求的结果值的签名,因为您必须提供 TimeStamp
参数并提供 return 消息的 List<T>
和不是唯一的。没有任何代码我们不能对这个问题说更多。
我认为TPL数据流块不能满足这样的要求。
我只是用一个列表来保存所有消息List<Message> messages;
struct Message {
int id;
string msg;
}
我需要使用锁或 actor 模型类似的邮箱,来处理我的客户对 List<Message>
的请求。
有一个服务器任务,它使用TPL Dataflow
向许多客户端任务发送消息。
- 客户端随机连接服务器
- 客户端可以向服务器发送消息,客户端可以从服务器接收消息
服务器使用BufferBlock<string>
向客户端发送消息,当客户端连接到服务器时,它从BufferBlock<string>
接收消息。
但是这个BufferBlock<string>
只能缓存一条消息,客户端不能向服务器请求多条消息,客户端不能设置条件select接收哪条消息.
我想要块类型,可以缓存多条消息;并且客户端不仅可以从该块类型中读取一条消息,而且客户端可以选择接收哪条消息;
我尝试了其他TPL Dataflow
方块类型,但没有人有这样的能力,TPL Dataflow
方块是否不适合这样的要求?
条件很简单,每条消息都有一个时间戳,客户端只需发送一个时间戳到服务器,然后服务器在时间戳之后发送 return 条消息。
using System;
using System.Web;
using System.Net;
using System.Threading.Tasks;
using System.Text;
using SimpleJSON;
using System.Collections.Generic;
using System.Threading.Tasks.Dataflow;
namespace TestHttp
{
public class HttpServer
{
private HttpListener httpListener;
public Task task;
public HttpServer()
{
var ta = Task.Factory.StartNew(RunHttp);
task = ta.Result;
}
private async Task RunHttp()
{
var httpPort = 9090;
httpListener = new HttpListener();
httpListener.Prefixes.Add("http://*:"+httpPort+"/");
httpListener.Start();
while (httpListener.IsListening)
{
var context = await httpListener.GetContextAsync();
var req = context.Request;
Handle(context, req);
}
httpListener.Stop();
httpListener.Close();
}
private async Task Handle(HttpListenerContext context, HttpListenerRequest req)
{
Console.WriteLine(req.RawUrl);
var resp = await HandleGet(req);
var buf = Encoding.UTF8.GetBytes(resp);
context.Response.AddHeader("Content-Encoding", "utf-8");
context.Response.ContentEncoding = Encoding.UTF8;
context.Response.ContentLength64 = buf.Length;
try
{
context.Response.OutputStream.Write(buf, 0, buf.Length);
}
catch (Exception exp)
{
Console.WriteLine(exp.ToString());
}
finally
{
context.Response.OutputStream.Close();
}
}
private BufferBlock<string> messages = new BufferBlock<string>();
private async Task<string> HandleGet(HttpListenerRequest req)
{
var r = req.RawUrl.Split('?');
if (r[0] == "/send")
{
await messages.SendAsync(r[1]);
return "Suc";
}
else if(r[0] == "/receive"){
var timestamp = Convert.ToInt32(r[1]);
var ret = await messages.ReceiveAsync();
return ret;
}
//Console.WriteLine(r[0]);
return "Error";
}
}
}
为什么要说 BufferBlock
can contain only one value? It's not true, it can contain as many messages as you want to, based on the block creation options and, specifically, BoundedCapacity
选项。此参数的默认值为-1
,代表无限容量。
因此,在客户端连接的那一刻,您可以轻松获取所有按时间戳过滤的消息,并将它们返回给客户端。这可能会导致更改客户端请求的结果值的签名,因为您必须提供 TimeStamp
参数并提供 return 消息的 List<T>
和不是唯一的。没有任何代码我们不能对这个问题说更多。
我认为TPL数据流块不能满足这样的要求。
我只是用一个列表来保存所有消息List<Message> messages;
struct Message {
int id;
string msg;
}
我需要使用锁或 actor 模型类似的邮箱,来处理我的客户对 List<Message>
的请求。