gpt4 book ai didi

c# - 为什么 Mono 上的 NetMQ DealerSocket 不向 Debian Wheezy 上的服务器发送消息,但在 Windows 上却发送消息?

转载 作者:行者123 更新时间:2023-12-05 05:21:20 26 4
gpt4 key购买 nike

我在 Debian Wheezy 上的 Mono 4.8 上使用 NetMQ 4.0.0.1 时遇到一些问题。

Dealer socket 在我不停止调用它发送新消息之前不发送任何消息。当我将 Thread.Sleep( 1000 ) 放在创建任务之间时,一切正常。我想承认,一切都在 .Net Framework 4.5 和 .Net Core 1.1 的 Windows 上运行,没有任何 Thread.Sleep()

我有这样的模式: enter image description here

我添加了调试消息,我可以看到我正在循环中的任务中创建 100 个 REQ 套接字,并且路由器正在队列中获取请求,而不是通过 Dealer 发送它们,TCP 的另一端什么也没有发生,直到我停止在 REQ 套接字上发送调用。每 5 个任务的简单 Thread.Sleep() 正在运行。看起来像是 Poller 错误或 Dealer 错误,或者我做错了什么。

中间框代码如下:

public class CollectorDevice : IDisposable
{
private NetMQPoller _poller;
private RouterSocket _frontendSocket;
private DealerSocket _backendSocket;
private readonly string _backEndAddress;
private readonly string _frontEndAddress;
private readonly int _expectedFrameCount;
private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false);
private readonly Thread _localThread;
private static Logger _logger = LogManager.GetCurrentClassLogger();

/// <summary>
/// Constructor
/// </summary>
/// <param name="backEndAddress"></param>
/// <param name="frontEndAddress"></param>
/// <param name="expectedFrameCount"></param>
public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
{
_expectedFrameCount = expectedFrameCount;

_backEndAddress = backEndAddress;
_frontEndAddress = frontEndAddress;

_localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
}

public void Start()
{
_localThread.Start();
_startSemaphore.WaitOne();


}

public void Stop()
{
_poller.Stop();
}

#region Implementation of IDisposable

public void Dispose()
{
Stop();
}

#endregion


#region Private Methods
private void DoWork()
{
try
{
using (_poller = new NetMQPoller())
using (_frontendSocket = new RouterSocket(_frontEndAddress))
using (_backendSocket = new DealerSocket(_backEndAddress))
{
_backendSocket.ReceiveReady += OnBackEndReady;
_frontendSocket.ReceiveReady += OnFrontEndReady;


_poller.Add(_frontendSocket);
_poller.Add(_backendSocket);

_startSemaphore.Set();

_poller.Run();
}
}
catch (Exception e)
{
_logger.Error(e);
}
}

private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
{
NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
_frontendSocket.SendMultipartMessage(message);
}

private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
{
NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
_backendSocket.SendMultipartMessage(message);
}

#endregion
}

这是一个客户端:

class Program
{
private static Logger _logger = LogManager.GetCurrentClassLogger();


private static void Main(string[] args)
{
Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
Console.ReadKey();


using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3))
{
collectorDevice.Start();

List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
Console.WriteLine(i);
int j = i;
Task t = Task.Factory.StartNew(() =>
{
try
{
using (var req = new RequestSocket("inproc://broker"))
{
req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
_logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));

string responseMessage = req.ReceiveFrameString();
_logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
}
}
catch (Exception e)
{
Console.WriteLine(e);
_logger.Error(e);
}
});
tasks.Add(t);
//Thread.Sleep (100);//<- This thread sleep is fixing problem?
}

Task.WaitAll(tasks.ToArray());
}

}
}

和服务器端:

class Program
{
private static Logger _logger = LogManager.GetCurrentClassLogger();

static void Main(string[] args)
{
try{
using (var routerSocket = new RouterSocket("@tcp://*:5556"))
{
var poller = new NetMQPoller();
routerSocket.ReceiveReady += RouterSocketOnReceiveReady;
poller.Add(routerSocket);
poller.Run();
}
}
catch(Exception e)
{
Console.WriteLine (e);
}

Console.ReadKey ();
}

private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
NetMQMessage clientMessage = new NetMQMessage();
bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
ref clientMessage, 5);

if (result == false)
{
Console.WriteLine ("Something went wrong?!");
}

var address = clientMessage[0];
var address2 = clientMessage[1];
var clientMessageString = clientMessage[3].ConvertToString();

//_logger.Debug("Message from client received: '{0}'", clientMessageString);
Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString));

netMqSocketEventArgs
.Socket.SendMoreFrame(address.Buffer)
.SendMoreFrame(address2.Buffer)
.SendMoreFrameEmpty()
.SendFrame("I have received your message");
}
}

有人知道吗?

我在想我可能正在使用来自不同线程的套接字,所以我将它打包到 ThreadLocal 结构中,但这没有帮助。比我读到的一些与 NetMQ 统一的问题所以我添加了'AsyncIO.ForceDotNet。力量();'在每次套接字构造函数调用之前,这也没有帮助。然后我将我的单声道从 4.4 更新到 4.8,它看起来还是一样。

我已检查任务之间的 Thread.Sleep(100) 是否正在解决问题。很奇怪

最佳答案

我测试了代码,它确实花费了很多时间,但最终服务器收到了所有消息(大约需要一分钟)。

问题是线程的数量,当有 100 个线程时,所有应该在 io 完成端口线程上完成的异步操作都会花费很多时间。我能够使用以下代码在没有 NetMQ 的情况下重现它

    public static void Main(string[] args)
{
ManualResetEvent resetEvent = new ManualResetEvent(false);

List<Task> tasks = new List<Task>();

for (int i = 0; i < 100; i++)
{
tasks.Add(Task.Run(() =>
{
resetEvent.WaitOne();
}));
}

Thread.Sleep(100);

Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Any, 5556));
listener.Listen(1);

SocketAsyncEventArgs args1 = new SocketAsyncEventArgs();
args1.Completed += (sender, eventArgs) =>
{
Console.WriteLine($"Accepted {args1.SocketError}");
resetEvent.Set();
};
listener.AcceptAsync(args1);

SocketAsyncEventArgs args2 = new SocketAsyncEventArgs();
args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556);
args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected");
Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.ConnectAsync(args2);

Task.WaitAll(tasks.ToArray());

Console.WriteLine("all tasks completed");
}

您可以看到这也需要大约一分钟的时间。只有 5 个线程,它立即完成。

无论如何,您可能希望启动更少的线程和/或重新解决单声道项目中的错误。

关于c# - 为什么 Mono 上的 NetMQ DealerSocket 不向 Debian Wheezy 上的服务器发送消息,但在 Windows 上却发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43094383/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com