gpt4 book ai didi

python - Python ZeroMQ 中的异步客户端/服务器模式

转载 作者:行者123 更新时间:2023-12-01 04:48:19 24 4
gpt4 key购买 nike

我有3个用Python编写的程序,需要连接。 2 个程序 X 和 Y 收集一些信息,并将这些信息发送给程序 Z。程序 Z 分析数据并向程序 X 和 Y 发送一些决策。类似X和Y的程序数量将来还会增加。最初我使用命名管道来允许从 X、Y 到 Z 的通信。但正如您所看到的,我需要双向关系。我的老板告诉我使用 ZeroMQ。我刚刚找到了适合我的用例的模式,称为异步客户端/服务器。请参阅下面 ZMQ 书籍 ( http://zguide.zeromq.org/py:all ) 中的代码。

问题是我的老板不想使用任何线程、 fork 等。我将客户端和服务器任务移至单独的程序,但我不确定如何处理 ServerWorker 类。可以在没有线程的情况下以某种方式使用它吗?另外,我想知道如何确定最佳的 worker 数量。

import zmq
import sys
import threading
import time
from random import randint, random

__author__ = "Felipe Cruz <felipecruz@loogica.net>"
__license__ = "MIT/X11"

def tprint(msg):
"""like print, but won't get newlines confused with multiple threads"""
sys.stdout.write(msg + '\n')
sys.stdout.flush()

class ClientTask(threading.Thread):
"""ClientTask"""
def __init__(self, id):
self.id = id
threading.Thread.__init__ (self)

def run(self):
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = u'worker-%d' % self.id
socket.identity = identity.encode('ascii')
socket.connect('tcp://localhost:5570')
print('Client %s started' % (identity))
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
reqs = 0
while True:
reqs = reqs + 1
print('Req #%d sent..' % (reqs))
socket.send_string(u'request #%d' % (reqs))
for i in range(5):
sockets = dict(poll.poll(1000))
if socket in sockets:
msg = socket.recv()
tprint('Client %s received: %s' % (identity, msg))

socket.close()
context.term()

class ServerTask(threading.Thread):
"""ServerTask"""
def __init__(self):
threading.Thread.__init__ (self)

def run(self):
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind('tcp://*:5570')

backend = context.socket(zmq.DEALER)
backend.bind('inproc://backend')

workers = []
for i in range(5):
worker = ServerWorker(context)
worker.start()
workers.append(worker)

poll = zmq.Poller()
poll.register(frontend, zmq.POLLIN)
poll.register(backend, zmq.POLLIN)

while True:
sockets = dict(poll.poll())
if frontend in sockets:
ident, msg = frontend.recv_multipart()
tprint('Server received %s id %s' % (msg, ident))
backend.send_multipart([ident, msg])
if backend in sockets:
ident, msg = backend.recv_multipart()
tprint('Sending to frontend %s id %s' % (msg, ident))
frontend.send_multipart([ident, msg])

frontend.close()
backend.close()
context.term()

class ServerWorker(threading.Thread):
"""ServerWorker"""
def __init__(self, context):
threading.Thread.__init__ (self)
self.context = context

def run(self):
worker = self.context.socket(zmq.DEALER)
worker.connect('inproc://backend')
tprint('Worker started')
while True:
ident, msg = worker.recv_multipart()
tprint('Worker received %s from %s' % (msg, ident))
replies = randint(0,4)
for i in range(replies):
time.sleep(1. / (randint(1,10)))
worker.send_multipart([ident, msg])

worker.close()

def main():
"""main function"""
server = ServerTask()
server.start()
for i in range(3):
client = ClientTask(i)
client.start()

server.join()

if __name__ == "__main__":
main()

最佳答案

所以,您从这里获取了代码:Asynchronous Client/Server Pattern

请密切注意显示此代码所针对的模型的图像。特别是,请查看“图 38 - 异步服务器的详细信息”。 ServerWorker 类正在旋转 5 个“Worker”节点。在代码中,这些节点是线程,但您可以使它们完全独立的程序。在这种情况下,您的服务器程序(可能)不会负责启动它们,它们会单独启动并仅与您的服务器通信它们已准备好接收工作。

您会在 ZMQ 示例中经常看到这种情况,这是在单个可执行文件中的线程中模仿的多节点拓扑。这只是为了让阅读整个内容变得容易,并不总是这样使用。

对于您的特定情况,让工作人员成为线程或将它们分解为单独的程序可能是有意义的......但如果这是您老板的业务要求,那么只需将它们分解为单独的程序即可。 p>

当然,为了回答你的第二个问题,如果不了解他们将要执行的工作负载以及他们需要多快的响应,就无法知道有多少 worker 是最佳的......你的目标是 worker 完成工作的速度比收到新工作的速度快。在许多情况下,这很有可能只需要一个 worker 就能完成。如果是这样,您可以让服务器本身成为工作人员,而只需跳过架构的整个“工作人员层”。为了简单起见,您应该从这里开始,并进行一些负载测试,看看它是否真的能有效地处理您的工作负载。如果没有,请了解完成一项任务需要多长时间,以及任务进入的速度有多快。假设一名工作人员可以在 15 秒内完成一项任务。每分钟 4 个任务。如果任务每分钟处理 5 个任务,则您需要 2 个工作人员,并且您将有一点增长空间。如果情况变化很大,那么您必须就资源与可靠性做出决定。

在深入了解之前,请确保阅读第 4 章“可靠的请求/答复模式”,它将提供一些处理异常的见解,并可能为您提供更好的遵循模式。

关于python - Python ZeroMQ 中的异步客户端/服务器模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28942092/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com