gpt4 book ai didi

python - 从异步子进程读取流输出

转载 作者:行者123 更新时间:2023-12-01 06:33:04 25 4
gpt4 key购买 nike

我试图从子进程中运行的程序读取 URL,然后安排异步 HTTP 请求,但看起来请求正在同步运行。这是因为子进程和请求都在同一个协程函数中运行吗?

测试.py

import random
import time

URLS = ['http://example.com', 'http://example.com/sleep5s']

def main():
for url in random.choices(URLS, weights=(1, 1), k=5):
print(url)
time.sleep(random.uniform(0.5, 1))


if __name__ == '__main__':
main()

ma​​in.py

import asyncio
import sys

import httpx

from httpx.exceptions import TimeoutException


async def req(url):
async with httpx.AsyncClient() as client:
try:
r = await client.get(url, timeout=2)
print(f'Response {url}: {r.status_code}')
except Exception as TimeoutException:
print(f'TIMEOUT - {url}')
except Exception as exc:
print(f'ERROR - {url}')


async def run():
proc = await asyncio.create_subprocess_exec(
sys.executable,
'-u',
'test.py',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

while True:
line = await proc.stdout.readline()
if not line:
break

url = line.decode().rstrip()
print(f'Found URL: {url}')

resp = await req(url)

await proc.wait()


async def main():
await run()


if __name__ == '__main__':
asyncio.run(main())

测试

$ python main.py
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s

最佳答案

it looks like the requests are running synchronously. Is that because subprocess and requests are both running in the same coroutine function?

您的诊断是正确的。 await 的含义如其所言:协程在得到结果之前不会继续执行。幸运的是,asyncio 可以轻松地在后台运行协程:

    tasks = []
while True:
line = await proc.stdout.readline()
if not line:
break

url = line.decode().rstrip()
print(f'Found URL: {url}')

tasks.append(asyncio.create_task(req(url)))

resps = asyncio.gather(*tasks)
await proc.wait()

请注意:

  • asyncio.create_task() 确保即使我们仍在读取这些行时请求也开始被处理
  • asyncio.gather() 确保所有任务实际上都在协程完成之前等待。它还提供对响应的访问并传播异常(如果有)。

关于python - 从异步子进程读取流输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59801263/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com