NamedPipeServerStream/NamedPipeClientStream 包装纸
NamedPipeServerStream/NamedPipeClientStream wrapper
我目前正在为 NamedPipeServerStream
/NamedPipeClientStream
编写一个完全基于 Event
的小型包装器,而不是使用 AsyncCallbacks
.
我公开了几乎所有可能的同步和异步方法(connecting/waiting 用于连接、写入等)所以如果消费者想要,例如,启动服务器实例并在客户端发送消息时连接他可以完全同步路线并做类似...
var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");
或者像...
这样的异步方式
var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately
但是,我正在努力寻找一种好的方法来执行相同的阅读消息操作。目前,当阅读一条消息时,我会触发一个 MessageAvailable
事件并将消息作为参数之一传递进来。
我只是想不出实现同步读取的正确方法。
我考虑的是:
具有获取消息的 GetNextMessage()
同步方法。在内部,这可以通过两种不同的方式处理:
我可以在 IEnumerable<Message>
中保留所有尚未使用的消息。因此,只要对方发送消息,我就会从流中读取它并将其存储在内存中,以便稍后可以被 GetNextMessage()
使用。优点是它在写入消息后立即释放流,因此它不会阻止另一方发送其他消息。缺点是我完全无法控制我将持有多少消息或它们的大小。我的 IEnumerable<Message>
可能最终有 10GB 的未使用消息,我对此无能为力,因为我无法强制消费者检索消息。
我可以认为我只将一条消息存储在内部缓冲区中,并且只有在通过 GetNextMessage()
使用该消息后才会再次开始读取。但是,如果我这样做,另一方将无法写入其他消息,直到前一条消息被消耗掉。更准确地说,另一端将能够写入直到流满为止。可以是多条完整的小消息,也可以是一条不完整的消息。在不完整的单个消息的情况下,我认为这是一种更糟糕的方法,因为在发送消息的第 1 部分和后续部分之间,另一端可能最终断开连接并且整个消息将丢失。
更难的是,在上述任何一种方法中,消费者总是有可能使用事件来接收消息(记住事件包含接收到的消息),因此不需要 GetNextMessage()
。我要么需要完全停止在事件中发送消息,要么找到一种方法,如果消息是通过事件消耗的,则不将事件推送到内部缓冲区。虽然我可以很容易地判断是否有事件处理程序,但没有办法知道消息是否真的在那里处理(即,考虑一个 class 实现这个并监听那个事件,但什么都不做用它)。我在这里看到的唯一真正的方法是从事件中删除消息,强制消费者始终调用 GetNextMessage()
,但对其他想法持开放态度。
这两种方法都有另一个问题,即如果使用 WriteAsync()
(或使用 Write()
,我无法控制消息发送的顺序来自不同的线程)。
谁能想出更好的方法来解决这个问题?
我建议采用以下方法。创建接口:
public interface ISubscription : IDisposable {
Message NextMessage(TimeSpan? timeout);
}
public class Message {
}
然后这样实现:
public class NamedPipeServer {
public void StartAndWait() {
}
public ISubscription StartAndSubscribe() {
// prevent race condition before Start and subscribing to MessageAvailable
var subscription = new Subscription(this);
StartAndWait();
return subscription;
}
public ISubscription Subscribe() {
// if user wants to subscribe and some point after start - why not
return new Subscription(this);
}
public event Action<Message> MessageAvailable;
private class Subscription : ISubscription {
// buffer
private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
new ConcurrentQueue<Message>());
private readonly NamedPipeServer _server;
public Subscription(NamedPipeServer server) {
// subscribe to event
_server = server;
_server.MessageAvailable += OnMessageAvailable;
}
public Message NextMessage(TimeSpan? timeout) {
// this is blocking call
if (timeout == null)
return _queue.Take();
else {
Message tmp;
if (_queue.TryTake(out tmp, timeout.Value))
return tmp;
return null;
}
}
private void OnMessageAvailable(Message msg) {
// add to buffer
_queue.Add(msg);
}
public void Dispose() {
// clean up
_server.MessageAvailable -= OnMessageAvailable;
_queue.CompleteAdding();
_queue.Dispose();
}
}
}
客户端然后调用 Subscribe
或 StartAndSubscribe
。
var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();
这样一来,如果没有人订阅 - 您就不会缓冲任何消息。如果有人订阅然后不消费 - 这是他们的问题,而不是你的问题,因为缓冲发生在他们现在拥有的订阅中。您还可以限制 _queue
阻塞集合的大小,然后在达到限制时向其添加将阻塞,从而阻塞您的 MessageAvailable
事件,但我不建议这样做。
我目前正在为 NamedPipeServerStream
/NamedPipeClientStream
编写一个完全基于 Event
的小型包装器,而不是使用 AsyncCallbacks
.
我公开了几乎所有可能的同步和异步方法(connecting/waiting 用于连接、写入等)所以如果消费者想要,例如,启动服务器实例并在客户端发送消息时连接他可以完全同步路线并做类似...
var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");
或者像...
这样的异步方式var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately
但是,我正在努力寻找一种好的方法来执行相同的阅读消息操作。目前,当阅读一条消息时,我会触发一个 MessageAvailable
事件并将消息作为参数之一传递进来。
我只是想不出实现同步读取的正确方法。
我考虑的是:
具有获取消息的 GetNextMessage()
同步方法。在内部,这可以通过两种不同的方式处理:
我可以在
IEnumerable<Message>
中保留所有尚未使用的消息。因此,只要对方发送消息,我就会从流中读取它并将其存储在内存中,以便稍后可以被GetNextMessage()
使用。优点是它在写入消息后立即释放流,因此它不会阻止另一方发送其他消息。缺点是我完全无法控制我将持有多少消息或它们的大小。我的IEnumerable<Message>
可能最终有 10GB 的未使用消息,我对此无能为力,因为我无法强制消费者检索消息。我可以认为我只将一条消息存储在内部缓冲区中,并且只有在通过
GetNextMessage()
使用该消息后才会再次开始读取。但是,如果我这样做,另一方将无法写入其他消息,直到前一条消息被消耗掉。更准确地说,另一端将能够写入直到流满为止。可以是多条完整的小消息,也可以是一条不完整的消息。在不完整的单个消息的情况下,我认为这是一种更糟糕的方法,因为在发送消息的第 1 部分和后续部分之间,另一端可能最终断开连接并且整个消息将丢失。
更难的是,在上述任何一种方法中,消费者总是有可能使用事件来接收消息(记住事件包含接收到的消息),因此不需要 GetNextMessage()
。我要么需要完全停止在事件中发送消息,要么找到一种方法,如果消息是通过事件消耗的,则不将事件推送到内部缓冲区。虽然我可以很容易地判断是否有事件处理程序,但没有办法知道消息是否真的在那里处理(即,考虑一个 class 实现这个并监听那个事件,但什么都不做用它)。我在这里看到的唯一真正的方法是从事件中删除消息,强制消费者始终调用 GetNextMessage()
,但对其他想法持开放态度。
这两种方法都有另一个问题,即如果使用 WriteAsync()
(或使用 Write()
,我无法控制消息发送的顺序来自不同的线程)。
谁能想出更好的方法来解决这个问题?
我建议采用以下方法。创建接口:
public interface ISubscription : IDisposable {
Message NextMessage(TimeSpan? timeout);
}
public class Message {
}
然后这样实现:
public class NamedPipeServer {
public void StartAndWait() {
}
public ISubscription StartAndSubscribe() {
// prevent race condition before Start and subscribing to MessageAvailable
var subscription = new Subscription(this);
StartAndWait();
return subscription;
}
public ISubscription Subscribe() {
// if user wants to subscribe and some point after start - why not
return new Subscription(this);
}
public event Action<Message> MessageAvailable;
private class Subscription : ISubscription {
// buffer
private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
new ConcurrentQueue<Message>());
private readonly NamedPipeServer _server;
public Subscription(NamedPipeServer server) {
// subscribe to event
_server = server;
_server.MessageAvailable += OnMessageAvailable;
}
public Message NextMessage(TimeSpan? timeout) {
// this is blocking call
if (timeout == null)
return _queue.Take();
else {
Message tmp;
if (_queue.TryTake(out tmp, timeout.Value))
return tmp;
return null;
}
}
private void OnMessageAvailable(Message msg) {
// add to buffer
_queue.Add(msg);
}
public void Dispose() {
// clean up
_server.MessageAvailable -= OnMessageAvailable;
_queue.CompleteAdding();
_queue.Dispose();
}
}
}
客户端然后调用 Subscribe
或 StartAndSubscribe
。
var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();
这样一来,如果没有人订阅 - 您就不会缓冲任何消息。如果有人订阅然后不消费 - 这是他们的问题,而不是你的问题,因为缓冲发生在他们现在拥有的订阅中。您还可以限制 _queue
阻塞集合的大小,然后在达到限制时向其添加将阻塞,从而阻塞您的 MessageAvailable
事件,但我不建议这样做。