gpt4 book ai didi

c# - 检测到 "Busy"WebSocket

转载 作者:太空宇宙 更新时间:2023-11-03 12:47:11 24 4
gpt4 key购买 nike

我最近在尝试使用 SendAsync 方法广播到打开的 WebSocket 时遇到问题,收到 InvalidOperationException 消息“发送操作已经在进行中”

深入挖掘 WebSocket 类的源代码,在内部跟踪忙碌状态:

internal ChannelState _sendState;

// Represents the state of a single channel; send and receive have their own states
internal enum ChannelState {
Ready, // this channel is available for transmitting new frames
Busy, // the channel is already busy transmitting frames
Closed // this channel has been closed
}

理想情况下,如果我要向 WebSocket 连接广播更新,我想提前知道它正忙并执行一些其他处理(例如将消息排队)。

这个状态被标记为内部状态似乎很奇怪——如果它是我可以简单检查的公共(public)属性(property)

context.WebSocket.SendState == ChannelState.Ready

WebSocket 上 SendAsync 的正确模式是什么可以防止抛出此异常?

我不愿意通过反射破解对该属性的访问。

编辑澄清:

WebSocket.State 属性对这种情况无济于事。该属性使用此枚举:

public enum WebSocketState
{
None,
Connecting,
Open,
CloseSent,
CloseReceived,
Closed,
Aborted
}

一旦套接字连接打开,无论它是否正忙于发送,该语句都将评估为“真”:

context.WebSocket.State == WebSocketState.Open

最佳答案

我已经实现了一个似乎可以满足我的需求的解决方案。

基本问题是当两个线程试图在同一个 WebSocket 上发送时。

我的解决方案有几个部分,并且取决于它在 AspNetWebSocketContect 下运行这一事实,因此我可以使用“Items”字典来存储有关当前连接的属性。

  1. “发送”属性用于跟踪 WebSocket 是否繁忙。
  2. “队列”属性是要发送的 ArraySegment 列表。
  3. 锁定 WebSocket 对象并同步运行 Send 方法。
  4. 发送完成后处理队列中的任何项目。
  5. 在方法开始时屈服以防止阻塞。

这是我目前在开发环境中使用的代码——我会监控它的扩展性:

/// <summary>
/// Send a message to a specific client.
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer)
{
// Return control to the calling method immediately.
await Task.Yield();

// Make sure we have data.
if (buffer.Count == 0)
return;

// The state of the connection is contained in the context Items dictionary.
bool sending;
lock (context)
{
// Are we already in the middle of a send?
bool.TryParse(context.Items["sending"]?.ToString(), out sending);

// If not, we are now.
if (!sending)
context.Items["sending"] = true;
}

if (!sending)
{
// Lock with a timeout, just in case.
if (!Monitor.TryEnter(context.WebSocket, 1000))
{
// If we couldn't obtain exclusive access to the socket in one second, something is wrong.
await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
return;
}

try
{
// Send the message synchronously.
var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
t.Wait();
}
finally
{
Monitor.Exit(context.WebSocket);
}

// Note that we've finished sending.
lock (context)
{
context.Items["sending"] = false;
}

// Handle any queued messages.
await HandleQueue(context);
}
else
{
// Add the message to the queue.
lock (context)
{
var queue = context.Items["queue"] as List<ArraySegment<byte>>;
if (queue == null)
context.Items["queue"] = queue = new List<ArraySegment<byte>>();
queue.Add(buffer);
}
}
}

/// <summary>
/// If there was a message in the queue for this particular web socket connection, send it.
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private static async Task HandleQueue(AspNetWebSocketContext context)
{
var buffer = new ArraySegment<byte>();
lock (context)
{
// Check for an item in the queue.
var queue = context.Items["queue"] as List<ArraySegment<byte>>;
if (queue != null && queue.Count > 0)
{
// Pull it off the top.
buffer = queue[0];
queue.RemoveAt(0);
}
}

// Send that message.
if (buffer.Count > 0)
await SendMessage(context, buffer);
}

我对这种方法的一些考虑:

  1. 我认为锁定一个简单的对象比锁定一个更复杂的对象更好,就像我在上面所做的那样。我不确定使用“context”和“context.WebSocket”对象进行锁定会产生什么后果。
  2. 理论上我不需要锁定 WebSocket,因为我已经在测试“发送”属性。然而,测试导致 WebSocket 在几秒钟的重负载下变得无响应。一旦我实现了锁定模式,这种情况就消失了。
  3. 我通过同一个 WebSocket 连接对 10 个并发线程(每个线程发送 1000 条消息)进行了测试。每条消息都有编号,这样我就可以在客户端记录它们,并且每条消息都通过了。所以队列系统似乎可以工作。

关于c# - 检测到 "Busy"WebSocket,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36877030/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com