- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我在 Ubuntu 18.04 上使用 Flask 1.0.2 和 Python 3.6。我的应用程序应该使用 asyncio 和 asyncio.create_subprocess_exec()
启动后台脚本,从中读取标准输出,然后在脚本完成后返回状态。
我基本上是想从这篇文章中实现一个答案:
Non-blocking read on a subprocess.PIPE in python
该脚本已成功启动,我从中获得了所有预期的输出,但问题是它永远不会返回(意味着永远不会到达 Killing subprocess now
行)。当我从 Linux 终端检查进程列表 ( ps
) 时,后台脚本已退出。
我做错了什么,如何成功突破async for line in process.stdout
环形?
在我导入后的文件顶部,我创建了我的事件循环:
# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)
async def readAsyncFunctionAndKill(cmd):
# Use global event loop
global eventLoop
print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
async for line in process.stdout:
line = line.decode(locale.getpreferredencoding(False))
print("%s"%line, flush=True)
print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
process.kill()
print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
return await process.wait() # wait for the child process to exit
@app.route("/train_model", methods=["GET"])
def train_new_model():
# Use global event loop
global eventLoop
with closing(eventLoop):
eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))
return jsonify("done"), 200
def main():
# Ensure that swap is activated since we don't have enough RAM to train our model otherwise
print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
subprocess.call("swapon -a", shell=True)
# Need to initialize GPU
print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
defaults.device = torch.device("cuda")
with torch.cuda.device(0):
torch.tensor([1.]).cuda()
print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))
try:
print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))
# Batch size
bs = 16
#bs = 8
# Create ImageBunch
tfms = get_transforms(do_flip=True,
flip_vert=True,
max_rotate=180.,
max_zoom=1.1,
max_lighting=0.5,
max_warp=0.1,
p_affine=0.75,
p_lighting=0.75)
# Create databunch using folder names as class names
# This also applies the transforms and batch size to the data
os.chdir(TRAINING_DIR)
data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)
...
# Create a new learner with an early stop callback
learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])
...
print("[%s] All done training ..." % (os.path.basename(__file__)))
# Success
sys.exit(0)
except Exception as err:
print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
sys.exit(255)
if __name__== "__main__":
main()
最佳答案
这里有几个问题:
asyncio.get_child_watcher().attach_loop(eventLoop)
主要是多余的,因为 eventLoop = asyncio.new_event_loop()
,如果在主线程上运行,已经做到了。OSError
异常(和子类),因为不合格 s.py
仅当它被设为可执行时才有效,以 #!
开头shebang 线,可在 PATH
上找到.它不会仅仅因为它在同一个目录中而工作,你也不想依赖当前的工作目录。async for line in process.stdout:
循环也会永远等待。考虑向代码添加超时以避免被错误的子进程阻塞。asyncio.run_coroutine_threadsafe()
function在特定线程中的循环上运行协程。asyncio
使用非阻塞 os.waitpid(-1, os.WNOHANG)
调用跟踪子状态并依赖于使用信号处理(只能在主线程上完成)。 Python 3.8 移除了这个限制(通过添加一个新的 child watcher implementation,它在单独的线程中使用阻塞的每个进程 os.waitpid()
调用,以额外的内存为代价。EventLoopPolicy.set_child_watcher()
并传入 different process watcher instance .实际上,这意味着向后移植 3.8 ThreadedChildWatcher
implementation .import asyncio
import itertools
import logging
import time
import threading
try:
# Python 3.8 or newer has a suitable process watcher
asyncio.ThreadedChildWatcher
except AttributeError:
# backport the Python 3.8 threaded child watcher
import os
import warnings
# Python 3.7 preferred API
_get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)
class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
def __init__(self):
self._pid_counter = itertools.count(0)
self._threads = {}
def is_active(self):
return True
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __del__(self, _warn=warnings.warn):
threads = [t for t in list(self._threads.values()) if t.is_alive()]
if threads:
_warn(
f"{self.__class__} has registered but not finished child processes",
ResourceWarning,
source=self,
)
def add_child_handler(self, pid, callback, *args):
loop = _get_running_loop()
thread = threading.Thread(
target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True,
)
self._threads[pid] = thread
thread.start()
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base class requires it
return True
def attach_loop(self, loop):
pass
def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0
try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255", pid
)
else:
if os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
elif os.WIFEXITED(status):
returncode = os.WEXITSTATUS(status)
else:
returncode = status
if loop.get_debug():
logger.debug(
"process %s exited with returncode %s", expected_pid, returncode
)
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)
self._threads.pop(expected_pid)
# add the watcher to the loop policy
asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
logger = logging.getLogger(__name__)
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
if _loop_thread is None:
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
# give the thread up to a second to produce a loop
deadline = time.time() + 1
while not _loop_thread.loop and time.time() < deadline:
time.sleep(0.001)
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
以上是我为
Make a Python asyncio call from a Flask route 发布的相同的通用“使用 Flask 运行异步”解决方案,但添加了
ThreadedChildWatcher
向后移植。
get_event_loop()
返回的循环。运行子进程,通过调用
run_coroutine_threadsafe()
:
import asyncio
import locale
import logging
logger = logging.getLogger(__name__)
def get_command_output(cmd, timeout=None):
encoding = locale.getpreferredencoding(False)
async def run_async():
try:
process = await asyncio.create_subprocess_exec(
cmd, stdout=asyncio.subprocess.PIPE)
except OSError:
logging.exception("Process %s could not be started", cmd)
return
async for line in process.stdout:
line = line.decode(encoding)
# TODO: actually do something with the data.
print(line, flush=True)
process.kill()
logging.debug("Process for %s exiting with %i", cmd, process.returncode)
return await process.wait()
future = run_coroutine(run_async())
result = None
try:
result = future.result(timeout)
except asyncio.TimeoutError:
logger.warn('The child process took too long, cancelling the task...')
future.cancel()
except Exception as exc:
logger.exception(f'The child process raised an exception')
return result
请注意,上述函数可能需要超时(以秒为单位),这是您等待子进程完成的最长时间。
关于路由中的 Python3 Flask asyncio 子进程挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58547753/
我是 Linux 的新手,并且继承了保持我们的单一 Linux 服务器运行的职责。这是我们的SVN服务器,所以比较重要。 原来在我之前维护它的人有一个 cron 任务,当有太多 svnserve 进程
Node 虽然自身存在多个线程,但是运行在 v8 上的 JavaScript 是单线程的。Node 的 child_process 模块用于创建子进程,我们可以通过子进程充分利用 CPU。范例:
Jenkins 有这么多进程处于事件状态是否正常? 我检查了我的设置,我只配置了 2 个“执行者”... htop http://d.pr/i/RZzG+ 最佳答案 您不仅要限制 Master 中的执
我正在尝试在 scala 中运行这样的 bash 命令: cat "example file.txt" | grep abc Scala 有一个特殊的流程管道语法,所以这是我的第一个方法: val f
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我需要一些帮助来理解并发编程的基础知识。事实上,我读得越多,就越感到困惑。因此,我理解进程是顺序执行的程序的一个实例,并且它可以由一个或多个线程组成。在单核CPU中,一次只能执行一个线程,而在多核CP
我的问题是在上一次集成测试后服务器进程没有关闭。 在integration.rs中,我有: lazy_static! { static ref SERVER: Arc> = {
我正在使用 Scala scala.sys.process图书馆。 我知道我可以用 ! 捕获退出代码和输出 !!但是如果我想同时捕获两者呢? 我看过这个答案 https://stackoverflow
我正在开发一个C++类(MyClass.cpp),将其编译为动态共享库(MyClass.so)。 同一台Linux计算机上运行的两个不同应用程序将使用此共享库。 它们是两个不同的应用程序。它不是多线程
我在我的 C 程序中使用 recvfrom() 从多个客户端接收 UDP 数据包,这些客户端可以使用自定义用户名登录。一旦他们登录,我希望他们的用户名与唯一的客户端进程配对,这样服务器就可以通过数据包
如何更改程序,以便函数 function_delayed_1 和 function_delayed_2 仅同时执行一次: int main(int argc, char *argv[]) {
考虑这两个程序: //in #define MAX 50 int main(int argc, char* argv[]) { int *count; int fd=shm
请告诉我如何一次打开三个终端,这样我的项目就可以轻松执行,而不必打开三个终端三次然后运行三个exe文件。请问我们如何通过脚本来做到这一点,即打开三个终端并执行三个 exe 文件。 最佳答案 在后台运行
我编写了一个监控服务来跟踪一组进程,并在服务行为异常、内存使用率高、超出 CPU 运行时间等时发出通知。 这在我的本地计算机上运行良好,但我需要它指向远程机器并获取这些机器上的进程信息。 我的方法,在
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 8年前关闭。 Improve this qu
我有一个允许用户上传文件的应用程序。上传完成后,必须在服务器上完成许多处理步骤(解压、存储、验证等...),因此稍后会在一切完成后通过电子邮件通知用户。 我见过很多示例,其中 System.Compo
这个问题对很多人来说可能听起来很愚蠢,但我想对这个话题有一个清晰的理解。例如:当我们在 linux(ubuntu, x86) 上构建一个 C 程序时,它会在成功编译和链接过程后生成 a.out。 a.
ps -eaf | grep java 命令在这里不是识别进程是否是 java 进程的解决方案,因为执行此命令后我的许多 java 进程未在输出中列出。 最佳答案 简答(希望有人写一个更全面的): 获
我有几个与内核态和用户态的 Windows 进程相关的问题。 如果我有一个 hello world 应用程序和一个暴露新系统调用 foo() 的 hello world 驱动程序,我很好奇在内核模式下
我找不到很多关于 Windows 中不受信任的完整性级别的信息,对此有一些疑问: 是否有不受信任的完整性级别进程可以创建命名对象的地方? (互斥锁、事件等) 不受信任的完整性级别进程是否应该能够打开一
我是一名优秀的程序员,十分优秀!