gpt4 book ai didi

python-3.x - 具有异步功能的 Python 多处理

转载 作者:行者123 更新时间:2023-12-05 08:52:57 24 4
gpt4 key购买 nike

我搭建了一个websocket服务器,简化版如下所示:

import websockets, subprocess, asyncio, json, re, os, sys
from multiprocessing import Process

def docker_command(command_words):
return subprocess.Popen(
["docker"] + command_words,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)

async def check_submission(websocket:object, submission:dict):
exercise=submission["exercise"]
with docker_command(["exec", "-w", "badkan", "grade_exercise", exercise]) as proc:
for line in proc.stdout:
print("> " + line)
await websocket.send(line)

async def run(websocket, path):
submission_json = await websocket.recv() # returns a string
submission = json.loads(submission_json) # converts the string to a python dict

####
await check_submission(websocket, submission)


websocketserver = websockets.server.serve(run, '0.0.0.0', 8888, origins=None)
asyncio.get_event_loop().run_until_complete(websocketserver)
asyncio.get_event_loop().run_forever()

当一次只有一个用户时它工作正常。但是,当多个用户尝试使用服务器时,服务器会依次处理他们,因此后面的用户必须等待很长时间。

我试图通过将标有“####”(“await check_submission ...”)的行替换为:

p = Process(target=check_submission, args=(websocket, submission,))
p.start()

但是,它没有用——我收到运行时警告:“协程:‘check_submission’从未等待”,而且我没有看到任何来自 websocket 的输出。

我还尝试将这些行替换为:

loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
await loop.run_in_executor(None, check_submission, websocket, submission)

但出现了不同的错误:“无法腌制 asyncio.Future 对象”。

我如何构建这个多处理 websocket 服务器?

最佳答案

这是我的例子,asyncio.run() 对我有用,使用多进程启动异步函数

class FlowConsumer(Base):
def __init__(self):
pass

async def run(self):
self.logger("start consumer process")
while True:
# get flow from queue
flow = {}
# call flow executor get result
executor = FlowExecutor(flow)
rtn = FlowResult()
try:
rtn = await executor.run()
except Exception as e:
self.logger("flow run except:{}".format(traceback.format_exc()))
rtn.status = FLOW_EXCEPT
rtn.msg = str(e)
self.logger("consumer flow finish,result:{}".format(rtn.dict()))
time.sleep(1)

def process(self):
asyncio.run(self.run())


processes = []
consumer_proc_count = 3

# start multi consumer processes
for _ in range(consumer_proc_count):
# old version
# p = Process(target=FlowConsumer().run)
p = Process(target=FlowConsumer().process)
p.start()
processes.append(p)

for p in processes:
p.join()

关于python-3.x - 具有异步功能的 Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55088249/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com