gpt4 book ai didi

c++ - 将 zmq::proxy 与 REQ/REP 模式结合使用

转载 作者:行者123 更新时间:2023-11-28 04:10:39 25 4
gpt4 key购买 nike

我试图了解 zmq::proxy 是如何工作的,但我遇到了问题:我希望将消息路由到正确的工作人员,但似乎身份和信封被忽略了:在示例 我想将消息从 client1 路由到 worker2,并将消息从 client2 路由到 worker1,但这些消息似乎是根据基于“第一个可用的 worker”的规则提供的。我做错了什么,还是我误解了身份的工作原理?

#include <atomic>
#include <cassert>
#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>

#include <zmq.hpp>
#include <zmq_addon.hpp>

using namespace zmq;
std::atomic_bool running;
context_t context(4);
std::mutex mtx;

void client_func(std::string name, std::string target, std::string message)
{
std::this_thread::sleep_for(std::chrono::seconds(1));

socket_t request_socket(context, socket_type::req);
request_socket.connect("inproc://router");
request_socket.setsockopt( ZMQ_IDENTITY, name.c_str(), name.size());

while(running)
{
multipart_t msg;
msg.addstr(target);
msg.addstr("");
msg.addstr(message);

std::cout << name << "sent a message: " << message << std::endl;
msg.send(request_socket);
multipart_t reply;
if(reply.recv(request_socket))
{
std::unique_lock<std::mutex>(mtx);
std::cout << name << " received a reply!" << std::endl;

for(size_t i = 0 ; i < reply.size() ; i++)
{
std::string theData(static_cast<char*>(reply[i].data()),reply[i].size());
std::cout << "Part " << i << ": " << theData << std::endl;
}

}

std::this_thread::sleep_for(std::chrono::seconds(1));
}

request_socket.close();
}


void worker_func(std::string name, std::string answer)
{
std::this_thread::sleep_for(std::chrono::seconds(1));

socket_t response_socket(context, socket_type::rep);
response_socket.connect("inproc://dealer");
response_socket.setsockopt( ZMQ_IDENTITY, "resp", 4);

while(running)
{
multipart_t request;

if(request.recv(response_socket))
{
std::unique_lock<std::mutex>(mtx);

std::cout << name << " received a request:" << std::endl;
for(size_t i = 0 ; i < request.size() ; i++)
{
std::string theData(static_cast<char*>(request[i].data()),request[i].size());
std::cout << "Part " << i << ": " << theData << std::endl;
}

std::string questioner(static_cast<char*>(request[0].data()),request[0].size());

multipart_t msg;
msg.addstr(questioner);
msg.addstr("");
msg.addstr(answer);

msg.send(response_socket);
}
}

response_socket.close();
}


int main(int argc, char* argv[])
{
running = true;

zmq::socket_t dealer(context, zmq::socket_type::dealer);
zmq::socket_t router(context, zmq::socket_type::router);
dealer.bind("inproc://dealer");
router.bind("inproc://router");

std::thread client1(client_func, "Client1", "Worker2", "Ciao");
std::thread client2(client_func, "Client2", "Worker1", "Hello");
std::thread worker1(worker_func, "Worker1","World");
std::thread worker2(worker_func, "Worker2","Mondo");

zmq::proxy(dealer,router);

dealer.close();
router.close();

if(client1.joinable())
client1.join();

if(client2.joinable())
client2.join();

if(worker1.joinable())
worker1.join();

if(worker2.joinable())
worker2.join();

return 0;
}

最佳答案

来自docs :

When the frontend is a ZMQ_ROUTER socket, and the backend is a ZMQ_DEALER socket, the proxy shall act as a shared queue that collects requests from a set of clients, and distributes these fairly among a set of services. Requests shall be fair-queued from frontend connections and distributed evenly across backend connections. Replies shall automatically return to the client that made the original request.

代理处理多个客户端并使用多个工作程序来处理请求。身份用于将响应发送到正确的客户端。您不能使用身份来“选择”特定的工作人员。

关于c++ - 将 zmq::proxy 与 REQ/REP 模式结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57870848/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com