- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我需要将字典作为消息从发布者发送到订阅者。使用 REQ/REP 模式 send_json 和 recv_json 工作得很好,但我似乎找不到适用于 PUB/SUB 的咒语。希望 PUB/SUB 不是只能使用 send() 和 recv() 的情况。
这是我整理的实验 list :
"""
Experiments with 0MQ PUB/SUB pattern
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice
import signal
def handler(signum, frame):
""" Handler for SIGTERM """
# kill the processes we've launched
try:
for name, proc in _procd.iteritems():
if proc and proc.is_alive():
proc.terminate()
finally:
os._exit(0)
signal.signal(signal.SIGTERM, handler)
PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
def publisher():
""" Randomly update and publish topics """
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.bind("tcp://*:{}".format(PORT))
## Init a dict of topic:value pairs
alltopics = dict()
for char in TOPICS:
alltopics[char] = time.time()
while True:
topic = choice(TOPICS)
alltopics[topic] = time.time()
## THIS IS SENDING
sock.send_json((topic, alltopics))
print "Sent topic {}".format(topic)
time.sleep(1)
def client(number, topics):
"""
Subscribe to list of topics and wait for messages.
"""
context = zmq.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://localhost:{}".format(PORT))
for topic in topics:
sock.setsockopt(zmq.SUBSCRIBE, topic)
print "subscribed to topics {}".format(topics)
while True:
## THIS NEVER RETURNS
print sock.recv_json()
## ALREADY TRIED THIS. DOES NOT WORK
#topic, msg = sock.recv_json()
#print "Client{} {}:{}".format(number, topic, msg[topic])
sys.stdout.flush()
if __name__ == '__main__':
_procd = dict()
## Launch publisher
name = 'publisher'
_procd[name] = Process(target=publisher, name=name)
_procd[name].start()
## Launch the subscribers
for n in range(10):
name = 'client{}'.format(n)
_procd[name] = Process(target=client,
name=name,
args=(n, sample(TOPICS,3)))
_procd[name].start()
## Sleep until killed
while True:
time.sleep(1)
这是我终止父进程之前的输出
$ python pubsub.py
Sent topic Y
subscribed to topics ['B', 'Q', 'F']
subscribed to topics ['N', 'E', 'O']
subscribed to topics ['Y', 'G', 'M']
subscribed to topics ['G', 'D', 'I']
subscribed to topics ['D', 'Y', 'W']
subscribed to topics ['A', 'N', 'W']
subscribed to topics ['F', 'K', 'V']
subscribed to topics ['A', 'Q', 'X']
subscribed to topics ['S', 'Y', 'V']
subscribed to topics ['E', 'S', 'D']
Sent topic I
Sent topic N
Sent topic J
Sent topic I
Sent topic A
Sent topic T
Sent topic A
Sent topic K
Sent topic V
Sent topic E
订阅和发送似乎没问题,但客户从不打印任何东西。客户端进程的回溯显示它们卡在 sock.recv_json() 调用上。我的第一次尝试被注释掉了。它也挂起。
最佳答案
我仍然希望看到它与 send_json() 和 recv_json() 一起工作,但是,根据 Jason 的建议,以下是有效的:
def mogrify(topic, msg):
""" json encode the message and prepend the topic """
return topic + ' ' + json.dumps(msg)
def demogrify(topicmsg):
""" Inverse of mogrify() """
json0 = topicmsg.find('{')
topic = topicmsg[0:json0].strip()
msg = json.loads(topicmsg[json0:])
return topic, msg
在 publisher() 中使用这个
sock.send(mogrify(topic, alltopics))
这在 client() 中
topic, msg = demogrify(sock.recv())
这是完整的 list ,后面是一些示例输出:
#!/usr/bin/env python
# coding: utf8
"""
Experiments with 0MQ PUB/SUB pattern.
Creates a publisher with 26 topics (A, B, ... Z) and
spawns clients that randomly subscribe to a subset
of the available topics. Console output shows
who subscribed to what, when topic updates are sent
and when clients receive the messages.
Runs until killed.
Author: Michael Ellis
License: WTFPL
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice
import json
PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" # split into ['A', 'B', ... ]
PUBSLEEP = 0.01 # Sleep time at bottom of publisher() loop.
NCLIENTS = 10 # Number of clients spawned.
NSUBS = 3 # Number of topics each client subscribes to.
assert NSUBS <= len(TOPICS)
def mogrify(topic, msg):
""" json encode the message and prepend the topic """
return topic + ' ' + json.dumps(msg)
def demogrify(topicmsg):
""" Inverse of mogrify() """
json0 = topicmsg.find('{')
topic = topicmsg[0:json0].strip()
msg = json.loads(topicmsg[json0:])
return topic, msg
def publisher():
""" Randomly update and publish topics """
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.bind("tcp://*:{}".format(PORT))
## Init a dict of topic:value pairs
alltopics = dict()
for char in TOPICS:
alltopics[char] = time.time()
while True:
try:
topic = choice(TOPICS)
alltopics[topic] = time.time()
sock.send(mogrify(topic, alltopics))
print "Sent topic {}".format(topic)
time.sleep(PUBSLEEP)
except KeyboardInterrupt:
sys.exit()
def client(number, topics):
"""
Subscribe to list of topics and wait for messages.
"""
context = zmq.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://localhost:{}".format(PORT))
for topic in topics:
sock.setsockopt(zmq.SUBSCRIBE, topic)
print "subscribed to topics {}".format(topics)
while True:
try:
topic, msg = demogrify(sock.recv())
print "Client{} {}:{}".format(number, topic, msg[topic])
sys.stdout.flush()
except KeyboardInterrupt:
sys.exit()
_procd = dict()
def run():
""" Spawn publisher and clients. Loop until terminated. """
## Launch publisher
name = 'publisher'
_procd[name] = Process(target=publisher, name=name)
_procd[name].start()
## Launch the subscribers
for n in range(NCLIENTS):
name = 'client{}'.format(n)
_procd[name] = Process(target=client,
name=name,
args=(n, sample(TOPICS, NSUBS)))
_procd[name].start()
## Sleep until killed
while True:
time.sleep(1)
if __name__ == '__main__':
import signal
def handler(signum, frame):
""" Handler for SIGTERM """
# kill the processes we've launched
try:
for _, proc in _procd.iteritems():
if proc and proc.is_alive():
proc.terminate()
finally:
sys.exit()
signal.signal(signal.SIGTERM, handler)
run()
示例输出
$ pubsub.py
Sent topic Q
subscribed to topics ['R', 'G', 'S']
subscribed to topics ['J', 'K', 'C']
subscribed to topics ['L', 'B', 'P']
subscribed to topics ['X', 'Z', 'A']
subscribed to topics ['K', 'O', 'R']
subscribed to topics ['J', 'Z', 'T']
subscribed to topics ['R', 'G', 'P']
subscribed to topics ['Y', 'A', 'O']
subscribed to topics ['U', 'S', 'C']
subscribed to topics ['B', 'P', 'L']
Sent topic U
Client8 U:1407506576.27
Sent topic E
Sent topic A
Client3 A:1407506576.29
Client7 A:1407506576.29
Sent topic A
Client3 A:1407506576.31
Client7 A:1407506576.31
Sent topic G
Client0 G:1407506576.32
Client6 G:1407506576.32
Sent topic E
Sent topic B
Client2 B:1407506576.34
Client9 B:1407506576.34
Sent topic R
Client0 R:1407506576.35
Client6 R:1407506576.35
Client4 R:1407506576.35
Sent topic U
Client8 U:1407506576.36
...
关于python - 如何将 send_json 与 pyzmq PUB SUB 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25188792/
这是我的代码,去掉了无关的东西: 协调器.py context = zmq.Context() socket = context.socket(zmq.ROUTER) port = socket.bi
我需要将字典作为消息从发布者发送到订阅者。使用 REQ/REP 模式 send_json 和 recv_json 工作得很好,但我似乎找不到适用于 PUB/SUB 的咒语。希望 PUB/SUB 不是只
我使用 zmq 和 python 以及 REQ-REP 方案来传输数据。我现在正在使用方法 send_json 来发送数据。但出于某种奇怪的原因,对于某些示例它有效,而对于其他示例则不行。 发生错误时
我是一名优秀的程序员,十分优秀!