gpt4 book ai didi

python-3.x - 如何确保按给定的顺序处理所有命令(和错误)

转载 作者:行者123 更新时间:2023-12-05 05:13:42 25 4
gpt4 key购买 nike

TLDR; 我如何制作一个“单文件”asyncio.Queue() 并向它提供我的 adb 命令,让它们按顺序执行接收(一个接一个),处理其中一个任务期间可能发生的错误(断开/重新连接),并在处理错误后继续处理队列的其余部分?


我正在开发一个利用现有 python-adb 的模块模块最终将我的 android 平板电脑作为媒体设备进行控制,并将其整合到我的家庭自动化设置中。

问题:
我的模块完全围绕 async 构建,而 python-adb 模块不是。 python-adb 模块也不管理/限制请求。而且我很快发现,如果请求多个 adb 命令的速度太快,adb 连接就会过载,从而导致错误并在发生断开连接时需要重新连接。

我的一个 friend 设法实现了一个解决方法/hack-y 解决方案。 注意 self._adb_lock & self._adb_error 初始设置在AndroidDevice类的__init__函数。

def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return

self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False

return returns

return _adb_wrapper

通过这种解决方法,我将 @adb_wrapper 装饰器置于所有进行 adb 调用的函数之上。然而,这是非常低效的,而且在高端设备上并不能防止 adb 连接过载。

输入asyncio
首先让我声明一下,目前我对 asyncio 的使用经验很少;因此,挑出哪些已经发布的问题对我有帮助是很重要的。因此,如果答案已经存在于其他地方,我深表歉意。另外,为了让人们了解我的库是如何运行的,代码块会有点冗长,但我只包含了文件的一部分(一些函数来展示我最终是如何交互的)并且我尝试只包括连接以显示命令链的函数。

我的解决方案:
我的目标是能够使用 asyncio 将所有命令排队并让它们一次发送一个,如果在任何时候命令失败(这会导致 adb 断开连接)我想重新建立adb 连接并继续执行命令队列。

当前代码结构:

class AndroidTV:
""" Represents an Android TV device. """

def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None

self.package_launcher = None
self.package_settings = None

self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)

@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)

# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)

if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app

except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)

@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app

if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON

# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'

@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("\r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None

@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')

@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')

@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')

@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))

@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))

async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1

正如我之前所说,上述方法部分有效,但基本上是一种创可贴。

直接进行adb.Shell调用的唯一命令是
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)

connectupdate 函数导致多个 adb.Shell 调用自身,所以这可能是我的问题最终所在。

我的(三部分)问题:
1. 如何在接收到所有命令时对其进行排队?
2. 按照收到的顺序执行?
3. 在任何时候处理错误,重新连接,然后继续执行命令队列的其余部分?

这是我完成此任务的失败尝试。

import asyncio

async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)

async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()


async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed

if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()

# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON

# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'

最佳答案

您需要确保在任何给定时间只有一个任务使用 adb 连接来执行命令。这意味着您需要使用 synchronisation primitives协调访问,或 use a queue为单个工作任务提供命令以执行。

接下来,因为 adb 连接是完全同步的,并且与所有 I/O 一样,相对,我会使用thread pool executor在 asyncio 循环之外的 adb 连接上运行操作,这样 asyncio 就可以自由运行一些当前未在 I/O 上阻塞的其他任务。否则,将 .Shell() 命令放入 async def 协程中是没有意义的,您实际上并没有合作并为其他任务的运行腾出空间。

最后但并非最不重要的一点是,如果即使对连接对象进行序列化访问,您发现每个时间段不能接受太多命令,您也会希望使用某种速率限制技术。我有 created an asyncio leaky bucket algorithm implementation before如果需要,可以解决这个问题。

队列或锁都可以确保命令以先到先得的顺序执行,但是队列需要某种延迟响应机制来返回命令结果。队列可以让您对相关命令进行排队(您可以使用 queue.put_nowait() 添加多个条目而不让步,或者您可以允许分组命令),而无需先等待锁定。

因为您想重试连接,所以我将连接对象封装在 asynchronous context manager 中,然后还可以使用执行程序处理锁定和执行命令:

import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial

try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
except AttributeError:
base = object # type: ignore

_retry_exceptions = (...,) # define exceptions on which to retry commands?

class asyncnullcontext(base):
def __init__(self, enter_result=None):
self.enter_result = enter_result
async def __aenter__(self):
return self.enter_result
async def __aexit__(self, *excinfo):
pass

class AsyncADBConnection(base):
def __init__(
self,
host,
adbkey=None,
rate_limit=None,
max_retry=None,
loop=None
):
self._lock = asyncio.Lock(loop=loop)
self._max_retry = max_retry
self._loop = None
self._connection = None
self._executor = ThreadPoolExecutor()

self._connect_kwargs = {
"serial": host,
"rsa_keys": [Signer(adbkey)] if adbkey else []
}

if rate_limit is not None:
# max commands per second
self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
else:
self._limiter = asyncnullcontext()

async def __aenter__(self):
await self._lock.acquire()
await self._ensure_connection()
return self

async def __aexit__(self):
self._lock.release()

async def _ensure_connection(self):
if self._connection is not None:
return
loop = self._loop or asyncio.get_running_loop()
connector = partial(
adb_commands.AdbCommands().ConnectDevice,
**self._connect_kwargs
)
fut = loop.run_in_executor(pool, connector)
self._connection = await fut

async def shell(self, command):
loop = self._loop or asyncio.get_running_loop()
max_attempts = self._max_retry or 1
attempts = 0
while True:
with self._limiter:
try:
fut = loop.run_in_executor(
self._executor,
self._connection.Shell,
command
)
return await fut
except _retry_exceptions as e:
attempts += 1
if attempts >= max_attempts:
raise
# re-connect on retry
self._connection = None
await self._ensure_connection()

如果您随后使用队列,请使用 Future() instances交流结果。

然后将作业插入队列变为:

fut = asyncio.Future()
await queue.put((command, fut))
result = await fut

您可以将其包装到实用函数或对象中。 await fut 行仅在 future 收到结果后返回。对于你不关心结果的命令,如果你想确保命令完成,你只需要 await

管理连接的工作任务中的消费者将使用:

while True:
command, fut = await self.queue.get():
async with self.connection as conn:
response = await conn.shell(command)
fut.set_result(response)
self.queue.task_done() # optional, only needed when joining the queue

其中 self.connection 是一个 AsyncADBConnection 实例。

关于python-3.x - 如何确保按给定的顺序处理所有命令(和错误),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53212210/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com