gpt4 book ai didi

python - Zeromq (pyzmq) ROUTER处理多个客户端的数据以及后续的超时处理

转载 作者:太空宇宙 更新时间:2023-11-03 16:37:54 24 4
gpt4 key购买 nike

我有一个ROUTER,其目的是积累来自多个DEALER客户端的图像数据并对完整图像执行OCR。我发现处理 OCR 最有效的方法是使用 Python 的多处理库;累积的图像字节被放入队列中,以便在单独的进程中进行适当处理。但是,我需要确保当客户端遇到超时时,Process 会正确终止,并且不会无意义地徘徊和占用资源。

在我当前的解决方案中,我将每个新连接的客户端插入到 dict 中,其中 value 是拥有所有图像数据的 ClientHandler 类并生成一个 Thread,当 5 秒过去时,该线程将名为“timeout”的 boolean 变量设置为 True。如果在 5 秒帧内收到新消息,则会调用 bump 并将计时器重置回 0,否则我会在线程终止之前进行清理,并从 dict< 中删除引用 在主循环中:

import threading
import time
import zmq

class ClientHandler(threading.Thread):
def __init__(self, socket):
self.elapsed = time.time()
self.timeout = False

self.socket = socket

super(ClientHandler, self).__init__()

def run(self):
while time.time() - self.elapsed < 5.0:
pass

self.timeout = True

# CLIENT TIMED OUT
# HANDLE TERMINATION AND CLEAN UP HERE

def bump(self):
self.elapsed = time.time()

def handle(self, id, header, data):
# HANDLE CLIENT DATA HERE
# ACCUMULATE IMAGE BYTES, ETC

self.socket.send_multipart([id, str(0)])

def server_task():
clients = dict()

context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)

server.setsockopt(zmq.RCVTIMEO, 0)

server.bind("tcp://127.0.0.1:7777")

while True:
try:
id, header, data = server.recv_multipart()

client = clients.get(id)

if client == None:
client = clients[id] = ClientHandler(server)

client.start()

client.bump()
client.handle(id, header, data)
except zmq.Again:
for id in clients.keys():
if clients[id].timeout:
del clients[id]

context.term()

if __name__ == "__main__":
server_task()

但是整个方法感觉不太正确。我这样做是否不当?如果是这样,如果有人能指出我正确的方向,我将不胜感激。

最佳答案

自己想出来的,希望对其他人有帮助。

我在分配的端口上有一个路由器,它将唯一的端口分配给每个客户端,然后连接到所述唯一端口上新绑定(bind)的套接字。当客户端断开连接时,端口将被回收以重新分配。

import sys
import zmq
from multiprocessing import Process, Queue, Value

def server_task():
context = zmq.Context.instance()

server = context.socket(zmq.ROUTER)

server.bind("tcp://127.0.0.1:7777")

timeout_queue = Queue()
port_list = [ 1 ]

proc_list = [ ]

while True:
try:
id = server.recv_multipart()[0]

# Get an unused port from the list
# Ports from clients that have timed out are recycled here

while not timeout_queue.empty():
port_list.append(timeout_queue.get())

port = port_list.pop()

if len(port_list) == 0:
port_list.append(port + 1)

# Spawn a new worker task, binding the port to a socket

proc_running = Value("b", True)

proc_list.append(proc_running)

Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()

# Send the new port to the client

server.send_multipart([id, str(7777 + port)])

except KeyboardInterrupt:
break

# Safely allow our worker processes to terminate
for proc_running in proc_list:
proc_running.value = False

context.term()

def worker_task(proc_running, port, timeout_queue):
context = zmq.Context.instance()

worker = context.socket(zmq.ROUTER)

worker.setsockopt(zmq.RCVTIMEO, 5000)
worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))

while proc_running.value:
try:
id, data = worker.recv_multipart()

worker.send_multipart([id, data])
except zmq.Again:
timeout_queue.put(port)

context.term()

break

print("Client on port %d disconnected" % (7777 + port, ))

关于python - Zeromq (pyzmq) ROUTER处理多个客户端的数据以及后续的超时处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37060438/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com