gpt4 book ai didi

zeromq - ZeroRPC 发布订阅

转载 作者:行者123 更新时间:2023-12-03 18:12:49 26 4
gpt4 key购买 nike

我想在我的服务器之间建立一个基于事件的系统。例如,当包装我的数据库逻辑的服务器更改状态时,我希望它通知我的其他服务器。发布/订阅设计似乎很适合这个,我听说过有关 ZeroRPC 的好消息。

有些人提到使用 zerorpc 流来完成发布/订阅,但是我不清楚如何使用流来触发事件。

最佳答案

在 dotCloud,我们通过 zerorpc 流使用了大量发布/订阅。让我描述一下我们的做法。

总之

我们公开了一个用@zerorpc.stream 修饰的流方法。调用此方法时,会将 gevent.queue 添加到集合中。然后该方法将永远循环,产生到达队列的每条消息。当此方法终止时(因为客户端断开连接),队列从集合中移除。

要发布,只需在集合中注册的每个队列上发布要发布的消息。此时,您必须决定要对缓慢的消费者采取什么措施(断开它们的连接,将它们排队到一定的限制和/或丢弃新消息)。

使用 zerorpc-python 的实现示例:

订阅部分

class MyService(object):
def __init__(self):
self._subscribers = set()

@zerorpc.stream
def subscribe(self):
try:
queue = gevent.queue.Queue()
self._subscribers.add(queue)
for msg in queue:
yield msg
finally:
self._subscribers.remove(queue)

subscribe 方法只是将事件队列添加到集合中。然后永远消耗队列,直到:
- 队列以 StopIteration 消息结束(参见 gevent.queue.Queue 文档)
- 运行订阅功能的greenlet被杀死(通常是因为客户端断开连接)

在这两种情况下,都会执行 finally 语句,并将队列从订阅者列表中删除。

请注意,此时可以限制队列的大小: ...Queue(maxsize=42)

出版部分
class MyService(object):
[...]

def _publish(self, msg):
for queue in self._subscribers:
if queue.size < 42:
queue.put(msg)

调用此方法发布消息。它将遍历所有订阅者队列以将消息放入其中。在我的示例中,如果队列达到特定大小,我将丢弃该消息。但是您想在那里应用什么样的模式没有限制。

您可以将订阅者的greenlet实例存储在集合中,然后在队列已满时将其杀死,有效断开慢速客户端(您甚至可以尝试发送消息通知客户端太慢)。您也可以等待所有消费者在从 _publish 等返回之前并行处理消息。我的 friend ,天空是极限!

希望有帮助!

关于zeromq - ZeroRPC 发布订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11979319/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com