- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
来自 doc :
write(data)
Write data to the stream.
This method is not subject to flow control. Calls to write() should be followed by drain().coroutine drain()
Wait until it is appropriate to resume writing to the stream. Example:
writer.write(data)
await writer.drain()
据我了解,
write
时都需要调用drain
。write
会阻塞循环线程那write为什么不是自动调用的协程呢?为什么一个调用 write
而不必耗尽?我可以想到两种情况
编写
并关闭
第一个是特例,我想我们可以有不同的 API。缓冲应该在写函数内部处理,应用程序不应该关心。
让我以不同的方式提出这个问题。这样做的缺点是什么? python3.8版本有效吗?
async def awrite(writer, data):
writer.write(data)
await writer.drain()
注意:drain
文档明确说明如下:
When there is nothing to wait for, the
drain()
returns immediately.
再次阅读答案和链接,我认为这些功能是这样工作的。 注意:检查已接受的答案以获得更准确的版本。
def write(data):
remaining = socket.try_write(data)
if remaining:
_pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data
async def drain():
if len(_pendingbuffer) < BUF_LIMIT:
return
await wait_until_other_side_is_up_to_speed()
assert len(_pendingbuffer) < BUF_LIMIT
async def awrite(writer, data):
writer.write(data)
await writer.drain()
那么什么时候使用什么:
write
awrite
awrite
。如果文件很大,loop.sendfile
如果可用的话会更好。最佳答案
From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread
两者都不正确,但混淆是可以理解的。 write()
的工作方式如下:
对 write()
的调用只是将数据存储到缓冲区,将其留给事件循环以在稍后实际写出,而无需程序进一步干预.就应用程序而言,数据在后台写入的速度与另一方接收数据的速度一样快。换句话说,每个 write()
都会安排它的数据使用尽可能多的操作系统级写入来传输,这些写入在相应的文件描述符实际可写时发出。所有这一切都会自动发生,甚至无需等待 drain()
。
write()
不是协程,它绝对从不阻塞事件循环。
第二个属性听起来很方便——你可以在任何需要的地方调用write()
,甚至是从一个不是async def
的函数调用——但它实际上是一个主要的write() 的>缺陷。流 API 公开的写入与接受数据的操作系统完全分离,因此如果您写入数据的速度快于网络对等体读取数据的速度,内部缓冲区将不断增长,您将拥有 memory leak。在你的手上。 drain()
修复了这个问题:如果写入缓冲区变得太大,等待它会暂停协程,并在 os.write()
执行后再次恢复协程后台成功,缓冲区缩小。
您不需要在每次 写入之后等待drain()
,但您确实需要偶尔等待它,通常是在write()
被调用。例如:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
drain()
如果待处理的未写入数据量很小,则立即返回。如果数据超过高阈值,drain()
将暂停调用协程,直到待处理的未写入数据量降至低阈值以下。暂停将导致协程停止从 peer1
读取数据,这又会导致对等方减慢它向我们发送数据的速率。这种反馈称为背压。
Buffering should be handled inside write function and application should not care.
这几乎就是 write()
现在的工作方式 - 它确实处理缓冲并且让应用程序不关心,无论是好是坏。另见 this answer了解更多信息。
Reading the answer and links again, I think the the functions work like this.
write()
还是比那个聪明一点。它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据可写为止。即使您从不 await drain()
也会发生这种情况 - 应用程序唯一必须做的就是让事件循环运行足够长的时间以写出所有内容。
一个更正确的write
和drain
伪代码可能是这样的:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
实际实现起来比较复杂,因为:
os.write
;drain()
并没有真正等到缓冲区为空,而是直到它到达 low watermark。 ;_do_write
中引发的 EWOULDBLOCK
以外的异常被存储并在 drain()
中重新引发。最后一点是调用 drain()
的另一个很好的理由 - 实际注意到对等点由于写入失败而消失了。
关于python - 为什么要显式调用 asyncio.StreamWriter.drain?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53779956/
我正在我的一个项目中使用 aiohttp 并想限制每秒发出的请求数。我正在使用 asyncio.Semaphore 来做到这一点。我的挑战是我可能想要增加/减少每秒允许的请求数。 例如: limit
如何混合 async with api.open() as o: ... 和 o = await api.open() 在一个功能中? 自从第一次需要带有 __aenter__ 的对象以来和
有 2 个工作:“wash_clothes”(job1) 和“setup_cleaning_robot”(job2),每个工作需要你 7 和 3 秒,你必须做到世界末日。 这是我的代码: import
我们有一种设置线程名称的方法:thread = threading.Thread(name='Very important thread', target=foo),然后在格式化程序中使用 %(thr
我有一些代码,用于抓取 URL、解析信息,然后使用 SQLAlchemy 将其放入数据库中。我尝试异步执行此操作,同时限制同时请求的最大数量。 这是我的代码: async def get_url(ai
1>Python Asyncio 未使用 asyncio.run_coroutine_threadsafe 运行新的协程。下面是在Mac上进行的代码测试。 ——————————————————————
asyncio.gather和 asyncio.wait似乎有类似的用途:我有一堆我想要执行/等待的异步事情(不一定要在下一个开始之前等待一个完成)。它们使用不同的语法,并且在某些细节上有所不同,但对
我正在尝试使用 asyncio 运行以下程序: import asyncio async def main(): print('Hello') await asyncio.sleep(
我正在尝试在事件循环之外使用协程函数。 (在这种情况下,我想在 Django 中调用一个也可以在事件循环中使用的函数) 如果不使调用函数成为协程,似乎没有办法做到这一点。 我意识到 Django 是为
我有一个假设 asyncio.gather设想: await asyncio.gather( cor1, [cor2, cor3], cor4, ) 我要 cor2和 cor3
我有多个服务器,每个服务器都是 asyncio.start_server 返回的实例。我需要我的 web_server 与 websockets 一起使用,以便能够使用我的 javascript 客户
我正在使用 Python 3 asyncio 框架评估定期执行的不同模式(为简洁起见省略了实际 sleep /延迟),我有两段代码表现不同,我无法解释原因。第一个版本使用 yield from 递归调
从事件线程外部将协程推送到事件线程的 pythonic 方法是什么? 最佳答案 更新信息: 从Python 3.7 高级函数asyncio.create_task(coro)开始was added并且
我有一个大型 (1M) 数据库结果集,我想为其每一行调用一个 REST API。 API 可以接受批处理请求,但我不确定如何分割 rows 生成器,以便每个任务处理一个行列表,比如 10。我宁愿不预先
迷失在异步中。 我同时在学习Kivy和asyncio,卡在了解决运行Kivy和运行asyncio循环的问题上,无论怎么转,都是阻塞调用,需要顺序执行(好吧,我希望我是错的),例如 loop = asy
我有这个 3.6 异步代码: async def send(command,userPath,token): async with websockets.connect('wss://127.
首先,我需要警告你:我是 asyncio 的新手,而且我是 我马上警告你,我是 asyncio 的新手,我很难想象引擎盖下的库里有什么。 这是我的代码: import asyncio semaphor
我有一个asyncio.PriorityQueue,用作网络爬虫的URL队列,当我调用url_queue.get时,得分最低的URL首先从队列中删除()。当队列达到 maxsize 项时,默认行为是阻
探索 Python 3.4.0 的 asyncio 模块,我试图创建一个类,其中包含从类外部的 event_loop 调用的 asyncio.coroutine 方法。 我的工作代码如下。 impor
我有一个可能是无用的问题,但尽管如此,我还是觉得我错过了一些对于理解 asyncio 的工作方式可能很重要的东西。 我刚刚开始熟悉 asyncio 并编写了这段非常基本的代码: import asyn
我是一名优秀的程序员,十分优秀!