gpt4 book ai didi

zeromq - NetMQ 为什么 Req-Rep 需要 "SendReady"?

转载 作者:行者123 更新时间:2023-12-05 02:20:53 25 4
gpt4 key购买 nike

我有一个问题,我设法解决了...但是我有点担心,因为我真的不明白为什么该解决方案有效;

我正在使用 NetMQ,特别是具有多个套接字的 NetMQ 轮询器,其中一个是 REQ-REP 对。

我有一个请求队列,这些请求被从队列中取出,服务器根据需要处理每个请求类型并发回适当的响应。这一直没有问题,但是当我尝试添加其他请求类型时,系统按预期停止工作;会发生什么是请求将到达服务器,服务器将发送响应......而客户端不会收到它。在服务器关闭之前,客户端不会收到消息(异常行为!)。

我一直在使用我在发送请求之前设置的标志来管理 REQ-REP 对,并在收到回复时重置。我设法通过只在 REQ 套接字的“SendReady”事件中触发回复来解决这个问题——这自动修复了我的所有问题,但是我在文档中找不到任何东西告诉我为什么套接字可能没有处于“发送就绪”状态,或者这实际上是做什么的。

任何可以透露为什么现在可以正常工作的信息都很棒 :)

干杯。

编辑:来源

客户:

“订阅”作为 UI 的单独线程运行

    private void Subscribe(string address)
{
using (var req = new RequestSocket(address + ":5555"))
using (var sub = new SubscriberSocket(address + ":5556"))
using (var poller = new NetMQPoller { req, sub })
{
// Send program code when a request for a code update is received
sub.ReceiveReady += (s, a) =>
{
var type = sub.ReceiveFrameString();
var reply = sub.ReceiveFrameString();

switch (type)
{
case "Type1":
manager.ChangeValue(reply);
break;

case "Type2":
string[] args = reply.Split(',');
eventAggregator.PublishOnUIThread(new MyEvent(args[0], (SimObjectActionEventType)Enum.Parse(typeof(MyEventType), args[1])));
break;
}
};

req.ReceiveReady += Req_ReceiveReady;

poller.RunAsync();

sub.Connect(address + ":5556");
sub.SubscribeToAnyTopic();
sub.Options.ReceiveHighWatermark = 10;

reqQueue = new Queue<string[]>();

reqQueue.Enqueue(new string[] { "InitialiseClient", "" });

req_sending = false;

while (programRunning)
{
if (reqQueue.Count > 0 && !req_sending)
{
req_sending = true;
string[] request = reqQueue.Dequeue();
Console.WriteLine("Sending " + request[0] + " " + request[1]);
req.SendMoreFrame(request[0]).SendFrame(request[1]);
}

Thread.Sleep(1);
}
}
}

private void Req_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
var req = e.Socket;

var messageType = req.ReceiveFrameString();

Console.WriteLine("Received {0}", messageType);

switch (messageType)
{
case "Reply1":
// Receive action

break;

case "Reply2":
// Receive action

break;

case "Reply3":
// Receive action

break;

}

req_sending = false;
}

服务器:

        using (var rep = new ResponseSocket("@tcp://*:5555"))
using (var pub = new PublisherSocket("@tcp://*:5556"))
using (var beacon = new NetMQBeacon())
using (var poller = new NetMQPoller { rep, pub, beacon })
{
// Send program code when a request for a code update is received
rep.ReceiveReady += (s, a) =>
{
var messageType = rep.ReceiveFrameString();
var message = rep.ReceiveFrameString();

Console.WriteLine("Received {0} - Content: {1}", messageType, message);

switch (messageType)
{
case "InitialiseClient":
// Send
rep.SendMoreFrame("Reply1").SendFrame(repData);
break;

case "Req2":
// do something
rep.SendMoreFrame("Reply2").SendFrame("RequestOK");

break;

case "Req3":
args = message.Split(',');

if (args.Length == 2)
{
// Do Something

rep.SendMoreFrame("Reply3").SendFrame("RequestOK");
}
else
{
rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
}

break;

case "Req4":
args = message.Split(',');

if (args.Length == 2)
{

requestData = //do something

rep.SendMoreFrame("Reply4").SendFrame(requestData);
}
else
{
rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
}

break;

default:
rep.SendMoreFrame("Ack").SendFrame("Error");
break;
}
};

// setup discovery beacon with 1 second interval
beacon.Configure(5555);
beacon.Publish("server", TimeSpan.FromSeconds(1));

// start the poller
poller.RunAsync();

// run the simulation loop
while (serverRunning)
{
// todo - make this operate for efficiently
// push updated variable values to clients
foreach (string[] message in pubQueue)
{
pub.SendMoreFrame(message[0]).SendFrame(message[1]);
}

pubQueue.Clear();

Thread.Sleep(2);
}

poller.StopAsync();
}

最佳答案

您正在使用来自多个线程的请求套接字,这是不支持的。您在主线程上发送并在轮询器线程上接收。

尝试使用 NetMQQueue 而不是使用常规队列,您可以将其添加到轮询器并从 UI 线程入队。然后发送和接收都发生在轮询器线程上。

您可以在此处阅读文档: http://netmq.readthedocs.io/en/latest/queue/

关于zeromq - NetMQ 为什么 Req-Rep 需要 "SendReady"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37804936/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com