- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我无法理解我的数据最终去了哪里。
我已经编写了一个测试来确保我的 Publisher
类成功发送数据,并且如果我绑定(bind)到它就可以接收到该数据。
该类本身继承自 Thread
,并公开了一个 publish()
方法,我可以调用该方法来传递要通过 Queue( )
。
但是,在我的测试中,数据从未到达。我确保使用相同的端口,我想不出这里还有什么问题。
我是 ZeroMQ 新手,但我之前已经设法让 PubSub
模式正常工作。
测试代码:
# Import Built-ins
import time
import json
import queue
from queue import Queue
from threading import Thread
# Import Third-Party
import zmq
def test_publisher_sends_data(self):
port = 667
name, topic, data = 'TestNode', 'testing', ['this', 'is', 'data']
encoded_name = json.dumps(name).encode('utf-8')
encoded_topic = json.dumps(topic).encode('utf-8')
encoded_data = json.dumps(data).encode('utf-8')
expected_result = (encoded_name, encoded_topic, encoded_data)
publisher = Publisher(port)
print("starting publisher")
publisher.start()
q = Queue()
def recv(q):
ctx = zmq.Context()
zmq_sock = ctx.socket(zmq.SUB)
print("Connecting to publisher")
zmq_sock.connect('tcp://127.0.0.1:%s' % port)
while True:
print("waiting for data..")
q.put(zmq_sock.recv_multipart())
print("data received!")
t = Thread(target=recv, args=(q,))
t.start()
print("sending data via publisher")
for i in range(5):
self.assertTrue(publisher.publish(name, topic, data))
time.sleep(0.1)
print("checking q for received data..")
try:
result = q.get(block=False)
except queue.Empty:
self.fail("Queue was empty, no data received!")
self.assertEqual(expected_result, result)
Publisher
类
# Import Built-Ins
import logging
import json
from queue import Queue
from threading import Thread, Event
# Import Third-Party
import zmq
class Publisher(Thread):
"""Publisher Class which allows publishing data to subscribers.
The publishing is realized with ZMQ Publisher sockets, and supports publishing
to multiple subscribers.
The run() method continuosly checks for data on the internal q, which is fed
by the publish() method.
"""
def __init__(self, port, *args, **kwargs):
"""Initialize Instance.
:param port:
"""
self.port = port
self._running = Event()
self.sock = None
self.q = Queue()
super(Publisher, self).__init__(*args, **kwargs)
def publish(self, node_name, topic, data):
"""Publish the given data to all current subscribers.
All parameters must be json-serializable objects
:param data:
:return:
"""
message_parts = [json.dumps(param).encode('utf-8')
for param in (node_name, topic, data)]
if self.sock:
self.q.put(message_parts)
return True
else:
return False
def join(self, timeout=None):
self._running.clear()
try:
self.sock.close()
except Exception:
pass
super(Publisher, self).join(timeout)
def run(self):
self._running.set()
ctx = zmq.Context()
self.sock = ctx.socket(zmq.PUB)
self.sock.bind("tcp://*:%s" % self.port)
while self._running.is_set():
if not self.q.empty():
msg_parts = self.q.get(block=False)
print("Sending data:", msg_parts)
self.sock.send_multipart(msg_parts)
else:
continue
ctx.destroy()
self.sock = None
最佳答案
添加 .setsockopt( zmq.SUBSCRIBE, someNonZeroLengthSTRING )
作为
SUB
-socket 实例未订阅任何内容(自然)如果任何传入消息无法匹配任何字符串,SUB
端被订阅,本地 .recv()
自然不会收到这样的消息。
鉴于您的代码没有显式订阅,因此不存在可以满足主题过滤器处理条件 Q.E.D 的此类消息。
作为另一个问题——“迟到者”的麻烦——接下来可能会发生,如果单元测试设计是快速的,我可能会建议你进一步(不仅是 ZeroMQ)分布式系统设计的最佳下一步是花时间阅读 Pieter HINTJENS 的精彩著作 “Code Connected,第 1 卷”。任何认真研究异构分布式系统信令/消息传递的人都会喜欢他分享技术和非技术观点和意见。
关于python - ZeroMQ 订阅者在单元测试中不接收任何数据。为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45468217/
我有一个存储结构向量的应用程序。这些结构保存有关系统上每个 GPU 的信息,如内存和 giga-flop/s。每个系统上有不同数量的 GPU。 我有一个程序可以同时在多台机器上运行,我需要收集这些数据
我很好奇 MPI 中缺少此功能: MPI_Isendrecv( ... ); 即,非阻塞发送和接收,谁能告诉我其省略背后的基本原理? 最佳答案 我的看法是 MPI_SENDRECV存在是为了方便那些想
当我用以下方法监听TCP或UDP套接字时 ssize_t recv(int sockfd, void *buf, size_t len, int flags); 或者 ssize_t recvfrom
SUM:如何在 azure 事件网格中推迟事件触发或事件接收? 我设计的系统需要对低频对象状态(创建、启动、检查长时间启动状态、结束)使用react。它看起来像是事件处理的候选者。我想用azure函数
我正在 MPI 中实现一个程序,其中主进程(等级 = 0)应该能够接收来自其他进程的请求,这些进程要求只有根才知道的变量值。如果我按等级 0 进行 MPI_Recv(...),我必须指定向根发送请求的
我正在学习DX12,并在此过程中学习“旧版Win32”。 我在退出主循环时遇到问题,这似乎与我没有收到WM_CLOSE消息有关。 在C++,Windows 10控制台应用程序中。 #include
SUM:如何在 azure 事件网格中推迟事件触发或事件接收? 我设计的系统需要对低频对象状态(创建、启动、检查长时间启动状态、结束)使用react。它看起来像是事件处理的候选者。我想用azure函数
我想编写方法来通过号码发送短信并使用编辑文本字段中的文本。发送消息后,我想收到一些声音或其他东西来提醒我收到短信。我怎样才能做到这一点?先感谢您,狼。 最佳答案 这个网站似乎对两者都有很好的描述:ht
所以我正在用 Java 编写一个程序,在 DatagramSocket 和 DatagramPacket 的帮助下发送和接收数据。问题是,在我发送数据/接收数据之间的某个时间 - 我发送数据的程序中的
我是 Android 编程新手,我正在用 Java 编写一个应用程序,该应用程序可以打开相机拍照并保存。我通过 Intents 做到了,但看不到 onActivityResult 正在运行。 我已经在
我有一个套接字服务器和一个套接字客户端。客户端只有一个套接字。我必须使用线程在客户端发送/接收数据。 static int sock = -1; static std::mutex mutex; vo
我正在尝试使用 c 中的套接字实现 TCP 服务器/客户端。我以这样的方式编写程序,即我们在客户端发送的任何内容都逐行显示在服务器中,直到键入退出。该程序可以运行,但数据最后一起显示在服务器中。有人可
我正在使用微 Controller 与 SIM808 模块通信,我想发送和接收 AT 命令。 现在的问题是,对于某些命令,我只收到了我应该收到的答案的一部分,但对于其他一些命令,我收到了我应该
我用c设计了一个消息传递接口(interface),用于在我的系统中运行的不同进程之间提供通信。该接口(interface)为此目的创建 10-12 个线程,并使用 TCP 套接字提供通信。 它工作正
我需要澄清一下在套接字程序中使用多个发送/接收。我的客户端程序如下所示(使用 TCP SOCK_STREAM)。 send(sockfd,"Messgfromlient",15,0);
我正在构建一个真正的基本代理服务器到我现有的HTTP服务器中。将传入连接添加到队列中,并将信号发送到另一个等待线程队列中的一个线程。此线程从队列中获取传入连接并对其进行处理。 问题是代理程序真的很慢。
我正在使用 $routeProvider 设置一条类似 的路线 when('/grab/:param1/:param2', { controller: 'someController',
我在欧洲有通过 HLS 流式传输的商业流媒体服务器。http://europe.server/stream1/index.m3u8现在我在美国的客户由于距离而遇到一些网络问题。 所以我在美国部署了新服
我有一个长期运行的 celery 任务,该任务遍历一系列项目并执行一些操作。 任务应该以某种方式报告当前正在处理的项目,以便最终用户知道任务的进度。 目前,我的django应用程序和celery一起坐
我需要将音频文件从浏览器发送到 python Controller 。我是这样做的: var xmlHttp = new XMLHttpRequest(); xmlHttp.open( "POST",
我是一名优秀的程序员,十分优秀!