gpt4 book ai didi

python - 从多个 shell 子进程中非阻塞实时读取 (Python)

转载 作者:太空宇宙 更新时间:2023-11-03 14:09:29 25 4
gpt4 key购买 nike

我正在使用 ffmpeg 构建实时多视频流监控和 subrocess 。我目前有以下代码,灵感来自 "Async and await with subprocesses" post .

问题是在一段时间后输出停止打印并且进程进入僵尸模式。我猜测这个问题与PIPE的过载或者死锁有关。需要帮助。

"""Async and await example using subprocesses

Note:
Requires Python 3.6.
"""

import os
import sys
import time
import platform
import asyncio

async def run_command_shell(command):
"""Run command in subprocess (shell)

Note:
This can be used if you wish to execute e.g. "copy"
on Windows, which can only be executed in the shell.
"""
# Create subprocess
process = await asyncio.create_subprocess_shell(
command,
stderr=asyncio.subprocess.PIPE)

# Status
print('Started:', command, '(pid = ' + str(process.pid) + ')')

# Wait for the subprocess to finish
stdout, stderr = await process.communicate()

# Progress
if process.returncode == 0:
print('Done:', command, '(pid = ' + str(process.pid) + ')')
else:
print('Failed:', command, '(pid = ' + str(process.pid) + ')')

# Result
result = stderr.decode().strip()

# Real time print
print(result)

# Return stdout
return result


def make_chunks(l, n):
"""Yield successive n-sized chunks from l.

Note:
Taken from https://stackoverflow.com/a/312464
"""
if sys.version_info.major == 2:
for i in xrange(0, len(l), n):
yield l[i:i + n]
else:
# Assume Python 3
for i in range(0, len(l), n):
yield l[i:i + n]


def run_asyncio_commands(tasks, max_concurrent_tasks=0):
"""Run tasks asynchronously using asyncio and return results

If max_concurrent_tasks are set to 0, no limit is applied.

Note:
By default, Windows uses SelectorEventLoop, which does not support
subprocesses. Therefore ProactorEventLoop is used on Windows.
https://docs.python.org/3/library/asyncio-eventloops.html#windows
"""

all_results = []

if max_concurrent_tasks == 0:
chunks = [tasks]
else:
chunks = make_chunks(l=tasks, n=max_concurrent_tasks)

for tasks_in_chunk in chunks:
if platform.system() == 'Windows':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()

commands = asyncio.gather(*tasks_in_chunk) # Unpack list using *
results = loop.run_until_complete(commands)
all_results += results
loop.close()
return all_results


if __name__ == '__main__':

start = time.time()

if platform.system() == 'Windows':
# Commands to be executed on Windows
commands = [
['hostname']
]
else:
# Commands to be executed on Unix
commands = [
['du', '-sh', '/var/tmp'],
['hostname'],
]
cmds = [["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"]]

tasks = []
for command in cmds:
tasks.append(run_command_shell(*command))


# # Shell execution example
# tasks = [run_command_shell('copy c:/somefile d:/new_file')]

# # List comprehension example
# tasks = [
# run_command(*command, get_project_path(project))
# for project in accessible_projects(all_projects)
# ]

results = run_asyncio_commands(tasks, max_concurrent_tasks=20) # At most 20 parallel tasks
print('Results:', results)

end = time.time()
rounded_end = ('{0:.4f}'.format(round(end-start,4)))
print('Script ran in about', str(rounded_end), 'seconds')

相关:Non-blocking read from multiple subprocesses (Python)

最佳答案

事实证明,该问题可能与通过多线程异步等进行的代码优化无关。

原因可能是服务器限制,例如打开文件/文件描述符 (FD) 的最大数量、防火墙、其他配置文件。

如果您偶然发现类似的问题:

<小时/>

安装htop

Htop是一款适用于 Linux/Unix 类系统的交互式实时进程监控应用程序,也是 top 的便捷替代品。命令,这是所有 Linux 操作系统上预装的默认进程监控工具。

这可能有助于澄清原因。

<小时/>

测试单个 ffmpeg 命令

正如jfs所说,我需要Minimal, Complete, and Verifiable example 。因此,我们从一个非常小的开始:测试一个流程。

ffmpeg -y -i udp://224.10.0.123:1234  -f null -

In my case it turned out that any multicast will hang in 2:10 - 2:20 minutes. Process alive but in zombie mode. This is very strange, because a couple of days ago everything worked perfectly.

<小时/>

测试另一个软件 ( VLC's multicat )

Multicat最新官方版本编号为2.2,已发布here .

拿到它,不要忘记,biTStream需要在构建时安装。

使用命令检查流以从流中录制视频:

timeout 10 multicat -uU @224.10.0.123:1234 test.ts

In my case, the same thing happened on the 2nd minute. The command does not stop executing, but the file ceased to be recorded.

<小时/>

检查打开文件/文件描述符的最大数量 more info

使用以下命令显示打开文件描述符的最大数量:

cat /proc/sys/fs/file-max

要查看硬值和软值,请发出以下命令:

ulimit -Hn
ulimit -Sn

At some point in the execution of one of my python scripts, I saw a similar error, but the increase in this parameter did not help me.

<小时/>

摘要

所以问题与我的脚本的并行执行无关。在另一台虚拟机上验证成功。我联系了设置此虚拟机的人并向他解释说最近几天出现了问题,表明问题出在防火墙中。他说他没有碰任何东西。但在这次通话之后,一切都开始完美。 (我几乎确定他弄坏了它):D

GL大家!

关于python - 从多个 shell 子进程中非阻塞实时读取 (Python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48618709/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com