- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
TLDR; 我如何制作一个“单文件”asyncio.Queue()
并向它提供我的 adb 命令,让它们按顺序执行接收(一个接一个),处理其中一个任务期间可能发生的错误(断开/重新连接),并在处理错误后继续处理队列的其余部分?
我正在开发一个利用现有 python-adb 的模块模块最终将我的 android 平板电脑作为媒体设备进行控制,并将其整合到我的家庭自动化设置中。
问题:
我的模块完全围绕 async
构建,而 python-adb
模块不是。 python-adb
模块也不管理/限制请求。而且我很快发现,如果请求多个 adb 命令的速度太快,adb 连接就会过载,从而导致错误并在发生断开连接时需要重新连接。
我的一个 friend 设法实现了一个解决方法/hack-y 解决方案。 注意 self._adb_lock
& self._adb_error
初始设置在AndroidDevice
类的__init__
函数。
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
通过这种解决方法,我将 @adb_wrapper
装饰器置于所有进行 adb 调用的函数之上。然而,这是非常低效的,而且在高端设备上并不能防止 adb 连接过载。
输入asyncio
首先让我声明一下,目前我对 asyncio
的使用经验很少;因此,挑出哪些已经发布的问题对我有帮助是很重要的。因此,如果答案已经存在于其他地方,我深表歉意。另外,为了让人们了解我的库是如何运行的,代码块会有点冗长,但我只包含了文件的一部分(一些函数来展示我最终是如何交互的)并且我尝试只包括连接以显示命令链的函数。
我的解决方案:
我的目标是能够使用 asyncio
将所有命令排队并让它们一次发送一个,如果在任何时候命令失败(这会导致 adb 断开连接)我想重新建立adb 连接并继续执行命令队列。
当前代码结构:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("\r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
正如我之前所说,上述方法部分有效,但基本上是一种创可贴。
直接进行adb.Shell
调用的唯一命令是
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)
connect
和 update
函数导致多个 adb.Shell
调用自身,所以这可能是我的问题最终所在。
我的(三部分)问题:
1. 如何在接收到所有命令时对其进行排队?
2. 按照收到的顺序执行?
3. 在任何时候处理错误,重新连接,然后继续执行命令队列的其余部分?
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
最佳答案
您需要确保在任何给定时间只有一个任务使用 adb
连接来执行命令。这意味着您需要使用 synchronisation primitives协调访问,或 use a queue为单个工作任务提供命令以执行。
接下来,因为 adb
连接是完全同步的,并且与所有 I/O 一样,相对慢,我会使用thread pool executor在 asyncio 循环之外的 adb
连接上运行操作,这样 asyncio 就可以自由运行一些当前未在 I/O 上阻塞的其他任务。否则,将 .Shell()
命令放入 async def
协程中是没有意义的,您实际上并没有合作并为其他任务的运行腾出空间。
最后但并非最不重要的一点是,如果即使对连接对象进行序列化访问,您发现每个时间段不能接受太多命令,您也会希望使用某种速率限制技术。我有 created an asyncio leaky bucket algorithm implementation before如果需要,可以解决这个问题。
队列或锁都可以确保命令以先到先得的顺序执行,但是队列需要某种延迟响应机制来返回命令结果。队列可以让您对相关命令进行排队(您可以使用 queue.put_nowait()
添加多个条目而不让步,或者您可以允许分组命令),而无需先等待锁定。
因为您想重试连接,所以我将连接对象封装在 asynchronous context manager 中,然后还可以使用执行程序处理锁定和执行命令:
import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore
_retry_exceptions = (...,) # define exceptions on which to retry commands?
class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass
class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()
self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else []
}
if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()
async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self
async def __aexit__(self):
self._lock.release()
async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut
async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()
如果您随后使用队列,请使用 Future()
instances交流结果。
然后将作业插入队列变为:
fut = asyncio.Future()
await queue.put((command, fut))
result = await fut
您可以将其包装到实用函数或对象中。 await fut
行仅在 future 收到结果后返回。对于你不关心结果的命令,如果你想确保命令完成,你只需要 await
。
管理连接的工作任务中的消费者将使用:
while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue
其中 self.connection
是一个 AsyncADBConnection
实例。
关于python-3.x - 如何确保按给定的顺序处理所有命令(和错误),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53212210/
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 9 年前。 Improve this
我有一系列 SQL 命令,我想在大约 40 个不同的表上运行。必须有一种方法可以在不编写 40 条不同命令的情况下执行此操作... 我在 SQL Server 中运行它。所有表都有不同的名称,我要操作
我习惯在 PHP 中使用命令“mysql_insert_id()”来返回插入到我的数据库中的最后一行的 id。 在 C# 中的 SQLite 中是否有等效的命令? 谢谢! -阿德娜 最佳答案 选择 l
试图找出一种方法来回填 ds 分区 Hive 表的分区。 我知道如何从 CLI 运行 Hive 命令,例如 $HIVE_HOME/bin/hive -e 'select a.col from tab1
我有 .bat 文件。看起来像下一个 ....many commands1 ftp -i -s:copy.txt ...many commands2 copy.txt 包含下一个命令 open ...
基本上我想输入 show 并检查是否有 show 命令或别名已定义并触发它,如果未定义则触发 git show 。 例如 rm 应该执行 rm 但 checkout 应该执行 git checkout
我公司的主数据库是 iSeries 机器,我已经非常习惯使用 DB2 命令和结构。我现在正在尝试做一个小项目,更新一个包含超过 300 万条记录的表。我想出一种比较和“清理”数据的更快方法是使用 My
我想在带有 Node 的终端中制作一个简单的按钮板,并“blessed”用于连接或运行不同的命令。 ----------------------------------------------- _
我们有一个 selenium IDE 脚本,正在转换为 python webdriver。以下命令未转换: [openWindow | http://mywebsite.com/index.php |
我正在学习这个关于从 GIT HUB 下载和安装 Web 文件的在线教程。我进入主题:启动我们的静态网站,系统提示我输入命令以下载和安装 Web 文件。但是,当我输入命令 yarn install 时
我在 shell 脚本中使用 elif 命令时遇到问题,就像在 fortran 中一样。 我有 100 家公司的员工名单。我想屏蔽那些员工少于 500 人的公司。我的脚本是 rm -f categor
我有一些 Linux 命令可以生成 token 。我在 Linux 机器上使用操作系统库形式的 Python 自动化了这些命令。它工作正常。 但是,当我在 Windows 中尝试相同的代码时,它没有返
本文分享自华为云社区《Git你有可能不知道交互式暂存》,作者:龙哥手记。 本节中的几个交互式 Git 命令可以帮助你将文件的特定部分组合成提交。 当你在修改了大量文件后,希望这些改动能拆分为若干提交而
我想知道如何使用 IN 比较语法来做到这一点。 当前的 SQL 查询是: select * from employee where (employeeName = 'AJAY' and month(e
我在这个位置安装了 Hadoop /usr/local/hadoop$ 现在我想列出 dfs 中的文件。我使用的命令是: hduser@ubuntu:/usr/local/hadoop$ bin/ha
是否有一个单一的 docker 命令可用于清除所有内容?如果正在运行,请停止所有容器、删除所有图像、删除所有卷...等。 最佳答案 我认为没有一个命令可以做到这一点。您首先需要停止所有容器使用 $ d
我基本上是在 clojure/nrepl 模式中寻找与 C-u C-x C-e 或 C-c C-p 等效的 Scheme。 我想要一个 C-x C-e 将输出打印到缓冲区,而不是仅仅在 repl 中。
我可以在 vim 中使用 pudb(一个 ncurses Python 调试器),因为,例如,:!python %在实际的终端窗口中运行。我更喜欢使用 gvim,但 gvim 运行 :!python
我正在尝试编写一个 FFMPEG 命令: 取为 输入 一个视频 input.mp4 和一个图像 pic.jpg 作为 输出 将 input.mp4 拆分为 20 秒的视频,按顺序重命名;对于每个分割视
我想转储视频每帧的比特率。我正在尝试使用 -vstats 获取此信息命令。当我运行此命令时 - ffmpeg -i input.mp4 -vstats 它显示至少应该定义一个文件。 如果有人能建议我任
我是一名优秀的程序员,十分优秀!