- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我目前正在从事一个项目,该项目需要通过来自分布式系统的某些实体的不同数据类型的网络进行一些通信,并且我正在使用 ZMQ。
该项目的主要目标是拥有一个中央节点,为可以随时连接的客户端提供服务。对于每个连接的客户端,中心节点应该管理两者之间的消息通信。
目前,所有通信都通过 TCP 进行。
客户端需要随时发送和接收消息,所以它们是ZMQ_DEALER
类型的套接字,中心节点是ZMQ_ROUTER
最初,目标是来自某个客户端的一条消息,这条消息到达其他客户端。这意味着其他客户端可以看到所有相同的数据。
我使用了 Asynchronous Client/Server pattern因为我有兴趣让多个客户端以协作方式相互交谈,可能有服务器代理或中间件。
我有一个连接到 ZMQ_ROUTER
套接字服务器的 ZMQ_DEALER
套接字客户端
#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;
int main(int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t client(context, ZMQ_DEALER);
const string endpoint = "tcp://localhost:5559";
client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
client.connect(endpoint);
for (int request = 0; request < 10; request++)
{
s_sendmore(client, "");
s_send(client, "Testing sending some data");
std::string string = s_recv(client);
std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}
在我的服务器代码上,我有一个接收和管理消息的 ZMQ_ROUTER,将其绑定(bind)到一个良好的端口。这个服务器是用 Python 制作的
import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
print("Creating Server Network Manager Router")
while True:
socks = dict(poller.poll())
if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv_multipart()
print(message)
frontend.send_multipart(message)
在我的其他同行/客户上,我有以下内容:
#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;
int main (int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t peer2(context, ZMQ_DEALER);
const string endpoint = "tcp://localhost:5559";
peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
peer2.connect(endpoint);
//s_sendmore(peer2, "");
//s_send(peer2, "Probando");
//std::string string = s_recv(peer2);
//std::cout << "Received reply " << " [" << string << "]" << std::endl;
for (int request = 0; request < 10; request++)
{
s_sendmore(peer2, "");
s_send(peer2, "Probando");
std::string string = s_recv(peer2);
std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}
更新
但是每次我执行某个客户端时,它们各自的消息都不会到达另一个对等客户端。消息到达 ZMQ_ROUTER
,并返回到 ZMQ_DEALER
发件人来源。
这是因为身份帧在接收时是在ROUTER之前的,消息是通过ROUTER发回的;它删除身份并使用该值将消息路由回相关经销商,according to the ZMQ_ROUTER section to the end page here .
这是逻辑,我将我的 DEALER
的身份发送到 ROUTER
,ROUTER
获取该身份框架并返回到我的 DEALER
消息
首先,为了开始我的实现,我需要任何经销商发送的一些消息,这将被任何其他经销商可视化,无论有多少经销商(一个或多个)连接到 ZMQ_ROUTER。从这个意义上说……是否有必要满足其他 DEALER 或其他 DEALERS 的身份框架?
如果我有DEALER A
、DEALER B
、DEALER C
和ROUTER
然后:
DEALER A
发送消息 ...我希望来自 DEALER A 的消息到达 DEALER B
和 DEALER C
以及可以加入我的 session 对话的其他 DEALER
。 ..
在这个想法顺序中,有必要在 DEALER A
端满足 DEALER B
和 DEALER C
的身份框架,这样消息到达他?
如何知道我的实现中存在的每个 DEALER 的身份框架?这是在ROUTER端做的?我还没有清除这个
最佳答案
您可以让所有客户端在启动时发送“我在这里”消息。然后中央服务器可以存储所有 ID,c.f. worker 和路由器之间的初始通信在这里:http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker .服务器会将任何收到的消息发送给所有当前已知的客户端。您应该添加一些心跳以检测断开连接的客户端,c.f. http://zguide.zeromq.org/page:all#Heartbeating .
但是ZeroMQ已经自带了这样一种通信模式:PUB
—SUB
。本质上,每个客户端都有一个 DEALER
和一个 SUB
套接字连接到服务器的 ROUTER
和 PUB
套接字。服务器简单地发送任何接收到的通过PUB
套接字向所有 客户端发送消息。如果这对原始客户端来说是个问题,您可以在消息中包含客户端 ID,以便每个客户端都可以过滤掉具有自己 ID 的消息。另请参阅指南中的示例 http://zguide.zeromq.org/page:all#Getting-an-Out-of-Band-Snapshot
另一个有趣的模式是 Republishing Updates from Clients :
此处PUSH
--PULL
用于将更新发送到服务器。如果不需要来自服务器的回复消息,这是有意义的。如果您不需要该示例中的状态请求,则可以省略 ROUTER
--DEALER
部分。为简洁起见,这里有一个使用 Python 的示例实现。服务器监听 PULL
套接字并通过 PUB
套接字发送所有内容:
import zmq
def main():
# context and sockets
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
collector = ctx.socket(zmq.PULL)
collector.bind("tcp://*:5558")
while True:
message = collector.recv()
print "I: publishing update %s" % message
publisher.send(message)
if __name__ == '__main__':
main()
客户端监听 PUB
套接字一段时间。如果收到一条消息,它会被记录下来。如果达到超时,将以十分之一的几率生成一条消息:
import random
import time
import zmq
def main():
# Prepare our context and subscriber
ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.connect("tcp://localhost:5558")
random.seed(time.time())
while True:
if subscriber.poll(100) & zmq.POLLIN:
message = subscriber.recv()
print "I: received message %s" % message
else:
rand = random.randint(1, 100)
if rand < 10:
publisher.send("%d" % rand)
print "I: sending message %d" % rand
if __name__ == '__main__':
main()
关于c++ - ZMQ 经销商 - 路由器通讯,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49289072/
我目前正在从事一个项目,该项目需要通过来自分布式系统的某些实体的不同数据类型的网络进行一些通信,并且我正在使用 ZMQ。 该项目的主要目标是拥有一个中央节点,为可以随时连接的客户端提供服务。对于每个连
在以下DEALER至 DEALER连接, worker DEALER发送 [][Foo!] ,即一个 2 帧的消息,到服务器 DEALER . package net.async import org
我是一名优秀的程序员,十分优秀!