gpt4 book ai didi

python - Tornado 中的队列和 ProcessPoolExecutor

转载 作者:行者123 更新时间:2023-12-01 04:22:31 25 4
gpt4 key购买 nike

我正在尝试使用新的 Tornado queue对象以及 concurrent.futures允许我的网络服务器将 CPU 密集型任务传递给其他进程。我想访问从 concurrent.futures 模块的 ProcessPoolExecutor 返回的 Future 对象,以便我可以查询其状态以显示在前端(例如显示进程当前正在运行;显示它已完成)。

我似乎使用这种方法有两个障碍:

  1. 如何向 ProcessPoolExecutor 提交多个 q.get() 对象,同时还可以访问返回的 Future 对象?<
  2. 如何让 HomeHandler 访问由 ProcessPoolExecutor 返回的 Future 对象,以便我可以显示该进程的状态信息前端?

感谢您的帮助。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

from concurrent.futures import ProcessPoolExecutor

define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)


def expensive_function(input_dict):
gen.sleep(1)


@gen.coroutine
def consumer():
while True:
input_dict = yield q.get()
try:
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(expensive_function, input_dict)
finally:
q.task_done()


@gen.coroutine
def producer(input_dict):
yield q.put(input_dict)


class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", HomeHandler),
]
settings = dict(
blog_title=u"test",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
debug=True,
)
super(Application, self).__init__(handlers, **settings)


class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.html")

def post(self, *args, **kwargs):
input_dict = {'foo': 'bar'}

producer(input_dict)

self.redirect("/")


def main():
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()


def start_consumer():
tornado.ioloop.IOLoop.current().spawn_callback(consumer)


if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(start_consumer)
main()

最佳答案

您想通过组合 QueueProcessPoolExecutor 来实现什么目的?执行器已经拥有自己的内部队列。您需要做的就是使 ProcessPoolExecutor 成为全局的(它不必是全局的,但即使您保留队列,您也会想做一些类似于全局的事情;它不每次通过消费者循环创建一个新的 ProcessPoolExecutor 并直接从处理程序向其提交内容是没有意义的。

@gen.coroutine
def post(self):
input_dict = ...
result = yield executor.submit(expensive_function, input_dict)

关于python - Tornado 中的队列和 ProcessPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33553940/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com