gpt4 book ai didi

c# - NamedPipeServerStream/NamedPipeClientStream 包装器

转载 作者:行者123 更新时间:2023-11-30 17:31:46 25 4
gpt4 key购买 nike

我目前正在为 NamedPipeServerStream 编写一个小包装器/NamedPipeClientStream那是完全Event基于而不是使用 AsyncCallbacks .

我为几乎所有可能的事情(连接/等待连接、写入等)公开同步和异步方法,因此如果消费者想要,例如,启动服务器实例并在客户端连接时发送消息,他可以进入完全同步路线并做类似...

var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");

或者像...这样的异步方式

var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately

但是,我正在努力寻找一种好的方法来执行相同的阅读消息操作。目前,当阅读一条消息时,我会触发 MessageAvailable事件并将消息作为参数之一传递。

我就是想不出一个合适的方法来实现同步读取。

我考虑的是以下几点:

有一个 GetNextMessage()获取消息的同步方法。在内部,这可以通过两种不同的方式处理:

  • 我可以保留一个 IEnumerable<Message>包含所有尚未使用的消息。因此,只要另一方发送消息,我就会从流中读取它并将其存储在内存中,以便以后可以被 GetNextMessage() 使用。 .优点是它在写入消息后立即释放流,因此它不会阻止另一方发送其他消息。缺点是我完全无法控制我将持有多少消息或它们的大小。我的IEnumerable<Message>最终可能会有 10GB 的未使用消息,我对此无能为力,因为我无法强制消费者检索消息。

  • 我可以认为我只在内部缓冲区中存储一条消息,并且只有在通过 GetNextMessage() 使用该消息后才开始再次读取.但是,如果我这样做,另一方将无法写入其他消息,直到前一条消息被消耗掉。更准确地说,另一端将能够写入直到流满为止。可以是多条完整的小消息,也可以是一条不完整的消息。在单个消息不完整的情况下,我认为这是一种更糟糕的方法,因为在发送消息的第 1 部分和后续部分之间,另一端可能会断开连接,整个消息将丢失。

    <

使事情变得更难的是,在上述任何一种方法中,消费者总是有可能使用事件来接收消息(记住事件包含接收到的消息),因此不需要 GetNextMessage() .我要么需要完全停止在事件中发送消息,要么找到一种方法,如果消息是通过事件消耗的,则不将事件推送到内部缓冲区。虽然我可以很容易地判断是否有事件处理程序,但没有办法知道消息是否真的在那里被处理(即,考虑一个实现这个事件并监听那个事件,但不对它做任何事情的类)。我在这里看到的唯一真正的方法是从事件中删除消息,强制消费者始终调用 GetNextMessage() , 但对其他想法持开放态度。

这两种方法还有另一个问题,即如果WriteAsync(),我无法控制消息发送的顺序。使用(或 Write() 从不同线程使用)。

谁能想出更好的方法来解决这个问题?

最佳答案

我建议采用以下方法。创建接口(interface):

public interface ISubscription : IDisposable {
Message NextMessage(TimeSpan? timeout);
}

public class Message {

}

然后像那样实现:

public class NamedPipeServer {        
public void StartAndWait() {

}

public ISubscription StartAndSubscribe() {
// prevent race condition before Start and subscribing to MessageAvailable
var subscription = new Subscription(this);
StartAndWait();
return subscription;
}

public ISubscription Subscribe() {
// if user wants to subscribe and some point after start - why not
return new Subscription(this);
}

public event Action<Message> MessageAvailable;

private class Subscription : ISubscription {
// buffer
private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
new ConcurrentQueue<Message>());

private readonly NamedPipeServer _server;

public Subscription(NamedPipeServer server) {
// subscribe to event
_server = server;
_server.MessageAvailable += OnMessageAvailable;
}

public Message NextMessage(TimeSpan? timeout) {
// this is blocking call
if (timeout == null)
return _queue.Take();
else {
Message tmp;
if (_queue.TryTake(out tmp, timeout.Value))
return tmp;
return null;
}
}

private void OnMessageAvailable(Message msg) {
// add to buffer
_queue.Add(msg);
}

public void Dispose() {
// clean up
_server.MessageAvailable -= OnMessageAvailable;
_queue.CompleteAdding();
_queue.Dispose();
}
}
}

然后客户端调用 SubscribeStartAndSubscribe

var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();

这样一来,如果没有人订阅 - 您就不会缓冲任何消息。如果有人订阅然后不消费 - 这是他们的问题,而不是你的问题,因为缓冲发生在他们现在拥有的订阅中。您还可以限制 _queue 阻塞集合的大小,然后在达到限制时向其添加将阻塞,从而阻塞您的 MessageAvailable 事件,但我不建议这样做。

关于c# - NamedPipeServerStream/NamedPipeClientStream 包装器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47456073/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com