gpt4 book ai didi

路由中的 Python3 Flask asyncio 子进程挂起

转载 作者:太空宇宙 更新时间:2023-11-04 07:48:50 28 4
gpt4 key购买 nike

我在 Ubuntu 18.04 上使用 Flask 1.0.2 和 Python 3.6。我的应用程序应该使用 asyncio 和 asyncio.create_subprocess_exec()启动后台脚本,从中读取标准输出,然后在脚本完成后返回状态。

我基本上是想从这篇文章中实现一个答案:
Non-blocking read on a subprocess.PIPE in python

该脚本已成功启动,我从中获得了所有预期的输出,但问题是它永远不会返回(意味着永远不会到达 Killing subprocess now 行)。当我从 Linux 终端检查进程列表 ( ps ) 时,后台脚本已退出。

我做错了什么,如何成功突破async for line in process.stdout环形?

在我导入后的文件顶部,我创建了我的事件循环:

# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)

我在我的路线上方定义了我的异步协程:
async def readAsyncFunctionAndKill(cmd):
# Use global event loop
global eventLoop

print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
async for line in process.stdout:
line = line.decode(locale.getpreferredencoding(False))
print("%s"%line, flush=True)
print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
process.kill()
print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
return await process.wait() # wait for the child process to exit

我的(缩写)路线在这里:
@app.route("/train_model", methods=["GET"])
def train_new_model():
# Use global event loop
global eventLoop

with closing(eventLoop):
eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))

return jsonify("done"), 200

调用的“s.py”脚本被标记为可执行文件并且位于同一工作目录中。缩写脚本如下所示(它包含几个子进程并实例化 PyTorch 类):
def main():

# Ensure that swap is activated since we don't have enough RAM to train our model otherwise
print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
subprocess.call("swapon -a", shell=True)

# Need to initialize GPU
print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
defaults.device = torch.device("cuda")
with torch.cuda.device(0):
torch.tensor([1.]).cuda()

print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))

try:

print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))


# Batch size
bs = 16
#bs = 8

# Create ImageBunch
tfms = get_transforms(do_flip=True,
flip_vert=True,
max_rotate=180.,
max_zoom=1.1,
max_lighting=0.5,
max_warp=0.1,
p_affine=0.75,
p_lighting=0.75)

# Create databunch using folder names as class names
# This also applies the transforms and batch size to the data
os.chdir(TRAINING_DIR)
data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)

...

# Create a new learner with an early stop callback
learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])

...

print("[%s] All done training ..." % (os.path.basename(__file__)))

# Success
sys.exit(0)

except Exception as err:

print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
sys.exit(255)

if __name__== "__main__":
main()

最佳答案

这里有几个问题:

  • 您在导入时创建了一个新的事件循环,一次,但在您的 View 中关闭事件循环。根本不需要关闭循环,因为第二个请求现在将失败,因为循环已关闭。
  • asyncio 事件循环不是线程安全的,不应在线程之间共享。绝大多数 Flask 部署将使用线程来处理传入的请求。您的代码带有应如何处理的回声,但不幸的是,这不是正确的方法。例如。 asyncio.get_child_watcher().attach_loop(eventLoop)主要是多余的,因为 eventLoop = asyncio.new_event_loop() ,如果在主线程上运行,已经做到了。
    这是您所看到的问题的主要候选者。
  • 您的代码假定可执行文件实际上存在且可执行。您应该处理 OSError异常(和子类),因为不合格 s.py仅当它被设为可执行时才有效,以 #! 开头shebang 线,可在 PATH 上找到.它不会仅仅因为它在同一个目录中而工作,你也不想依赖当前的工作目录。
  • 您的代码假定该进程在某个时候关闭标准输出。如果子进程从不关闭标准输出(进程退出时自动发生的事情),那么您的 async for line in process.stdout:循环也会永远等待。考虑向代码添加超时以避免被错误的子进程阻塞。

  • 在多线程应用程序中使用 asyncio 子进程时,您确实想阅读 Python asyncio 文档中的两个部分:
  • Concurrency and Multithreading section ,解释说几乎所有的 asyncio 对象都不是线程安全的。您不想直接从其他线程向循环中添加任务;你想为每个线程使用一个事件循环,或者使用 asyncio.run_coroutine_threadsafe() function在特定线程中的循环上运行协程。
  • 对于 3.7 以下的 Python 版本,您还需要阅读 Subprocess and Threads section , 因为直到那个版本 asyncio使用非阻塞 os.waitpid(-1, os.WNOHANG)调用跟踪子状态并依赖于使用信号处理(只能在主线程上完成)。 Python 3.8 移除了这个限制(通过添加一个新的 child watcher implementation,它在单独的线程中使用阻塞的每个进程 os.waitpid() 调用,以额外的内存为代价。
    但是,您不必坚持默认的子观察者策略。您可以使用 EventLoopPolicy.set_child_watcher() 并传入 different process watcher instance .实际上,这意味着向后移植 3.8 ThreadedChildWatcher implementation .

  • 对于您的用例,确实不需要为每个线程运行一个新的事件循环。根据需要在单独的线程中运行单个循环。如果您在单独的线程中使用循环,根据您的 Python 版本,您可能还需要在主线程上运行循环或使用不同的进程观察器。一般来说,在 WSGI 服务器的主线程上运行 asyncio 循环并不容易,甚至不可能。
    因此,您需要在单独的线程中永久运行一个循环,并且您需要使用一个无需主线程循环即可工作的子进程观察器。这是一个实现,这应该适用于 Python 3.6 及更高版本:
    import asyncio
    import itertools
    import logging
    import time
    import threading

    try:
    # Python 3.8 or newer has a suitable process watcher
    asyncio.ThreadedChildWatcher
    except AttributeError:
    # backport the Python 3.8 threaded child watcher
    import os
    import warnings

    # Python 3.7 preferred API
    _get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)

    class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
    def __init__(self):
    self._pid_counter = itertools.count(0)
    self._threads = {}

    def is_active(self):
    return True

    def close(self):
    pass

    def __enter__(self):
    return self

    def __exit__(self, exc_type, exc_val, exc_tb):
    pass

    def __del__(self, _warn=warnings.warn):
    threads = [t for t in list(self._threads.values()) if t.is_alive()]
    if threads:
    _warn(
    f"{self.__class__} has registered but not finished child processes",
    ResourceWarning,
    source=self,
    )

    def add_child_handler(self, pid, callback, *args):
    loop = _get_running_loop()
    thread = threading.Thread(
    target=self._do_waitpid,
    name=f"waitpid-{next(self._pid_counter)}",
    args=(loop, pid, callback, args),
    daemon=True,
    )
    self._threads[pid] = thread
    thread.start()

    def remove_child_handler(self, pid):
    # asyncio never calls remove_child_handler() !!!
    # The method is no-op but is implemented because
    # abstract base class requires it
    return True

    def attach_loop(self, loop):
    pass

    def _do_waitpid(self, loop, expected_pid, callback, args):
    assert expected_pid > 0

    try:
    pid, status = os.waitpid(expected_pid, 0)
    except ChildProcessError:
    # The child process is already reaped
    # (may happen if waitpid() is called elsewhere).
    pid = expected_pid
    returncode = 255
    logger.warning(
    "Unknown child process pid %d, will report returncode 255", pid
    )
    else:
    if os.WIFSIGNALED(status):
    returncode = -os.WTERMSIG(status)
    elif os.WIFEXITED(status):
    returncode = os.WEXITSTATUS(status)
    else:
    returncode = status

    if loop.get_debug():
    logger.debug(
    "process %s exited with returncode %s", expected_pid, returncode
    )

    if loop.is_closed():
    logger.warning("Loop %r that handles pid %r is closed", loop, pid)
    else:
    loop.call_soon_threadsafe(callback, pid, returncode, *args)

    self._threads.pop(expected_pid)

    # add the watcher to the loop policy
    asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())

    __all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

    logger = logging.getLogger(__name__)

    class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
    name = f"{type(self).__name__}-{next(self._count)}"
    super().__init__(name=name, daemon=True)

    def __repr__(self):
    loop, r, c, d = self.loop, False, True, False
    if loop is not None:
    r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
    return (
    f"<{type(self).__name__} {self.name} id={self.ident} "
    f"running={r} closed={c} debug={d}>"
    )

    def run(self):
    self.loop = loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    try:
    loop.run_forever()
    finally:
    try:
    shutdown_asyncgens = loop.shutdown_asyncgens()
    except AttributeError:
    pass
    else:
    loop.run_until_complete(shutdown_asyncgens)
    loop.close()
    asyncio.set_event_loop(None)

    def stop(self):
    loop, self.loop = self.loop, None
    if loop is None:
    return
    loop.call_soon_threadsafe(loop.stop)
    self.join()

    _lock = threading.Lock()
    _loop_thread = None

    def get_event_loop():
    global _loop_thread
    if _loop_thread is None:
    with _lock:
    if _loop_thread is None:
    _loop_thread = EventLoopThread()
    _loop_thread.start()
    # give the thread up to a second to produce a loop
    deadline = time.time() + 1
    while not _loop_thread.loop and time.time() < deadline:
    time.sleep(0.001)

    return _loop_thread.loop

    def stop_event_loop():
    global _loop_thread
    with _lock:
    if _loop_thread is not None:
    _loop_thread.stop()
    _loop_thread = None

    def run_coroutine(coro):
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
    以上是我为 Make a Python asyncio call from a Flask route 发布的相同的通用“使用 Flask 运行异步”解决方案,但添加了 ThreadedChildWatcher向后移植。
    然后您可以使用从 get_event_loop() 返回的循环。运行子进程,通过调用 run_coroutine_threadsafe() :
    import asyncio
    import locale
    import logging

    logger = logging.getLogger(__name__)


    def get_command_output(cmd, timeout=None):
    encoding = locale.getpreferredencoding(False)

    async def run_async():
    try:
    process = await asyncio.create_subprocess_exec(
    cmd, stdout=asyncio.subprocess.PIPE)
    except OSError:
    logging.exception("Process %s could not be started", cmd)
    return

    async for line in process.stdout:
    line = line.decode(encoding)
    # TODO: actually do something with the data.
    print(line, flush=True)

    process.kill()
    logging.debug("Process for %s exiting with %i", cmd, process.returncode)

    return await process.wait()

    future = run_coroutine(run_async())
    result = None
    try:
    result = future.result(timeout)
    except asyncio.TimeoutError:
    logger.warn('The child process took too long, cancelling the task...')
    future.cancel()
    except Exception as exc:
    logger.exception(f'The child process raised an exception')
    return result
    请注意,上述函数可能需要超时(以秒为单位),这是您等待子进程完成的最长时间。

    关于路由中的 Python3 Flask asyncio 子进程挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58547753/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com