gpt4 book ai didi

python - 如何在 python 代码段上应用并行或异步 I/O 文件写入

转载 作者:太空狗 更新时间:2023-10-29 21:39:26 25 4
gpt4 key购买 nike

首先,我们得到以下代码:

from validate_email import validate_email
import time
import os

def verify_emails(email_path, good_filepath, bad_filepath):
good_emails = open(good_filepath, 'w+')
bad_emails = open(bad_filepath, 'w+')

emails = set()

with open(email_path) as f:
for email in f:
email = email.strip()

if email in emails:
continue
emails.add(email)

if validate_email(email, verify=True):
good_emails.write(email + '\n')
else:
bad_emails.write(email + '\n')

if __name__ == "__main__":
os.system('cls')
verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")

emails.txt 包含大量行 (>1k) 时,我预计联系 SMTP 服务器是我程序中成本最高的部分。使用某种形式的并行或异步 I/O 应该可以大大加快速度,因为我可以等待多个服务器响应而不是顺序等待。

据我所知:

Asynchronous I/O operates by queuing a request for I/O to the file descriptor, tracked independently of the calling process. For a file descriptor that supports asynchronous I/O (raw disk devcies typically), a process can call aio_read() (for instance) to request a number of bytes be read from the file descriptor. The system call returns immediately, whether or not the I/O has completed. Some time later, the process then polls the operating system for the completion of the I/O (that is, buffer is filled with data).

老实说,我不太明白如何在我的程序中实现异步 I/O。谁能花一点时间向我解释一下整个过程?


EDIT 按照 PArakleta 的建议:

from validate_email import validate_email
import time
import os
from multiprocessing import Pool
import itertools

def validate_map(e):
return (validate_email(e.strip(), verify=True), e)

seen_emails = set()
def unique(e):
if e in seen_emails:
return False
seen_emails.add(e)
return True

def verify_emails(email_path, good_filepath, bad_filepath):
good_emails = open(good_filepath, 'w+')
bad_emails = open(bad_filepath, 'w+')

with open(email_path, "r") as f:
for result in Pool().imap_unordered(validate_map,
itertools.ifilter(unique, f):
(good, email) = result
if good:
good_emails.write(email)
else:
bad_emails.write(email)
good_emails.close()
bad_emails.close()

if __name__ == "__main__":
os.system('cls')
verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")

最佳答案

你问错问题了

查看了 validate_email 包后,您真正的问题是您没有有效地对结果进行批处理。您应该只为每个域执行一次 MX 查找,然后只连接到每个 MX 服务器一次,通过握手,然后在一个批处理中检查该服务器的所有地址。值得庆幸的是,validate_email 包为您缓存了 MX 结果,但您仍然需要按服务器对电子邮件地址进行分组,以便将查询批量发送到服务器本身。

您需要编辑 validate_email 包来实现批处理,然后可能使用实际的 threading 库而不是 multiprocessing .

分析您的程序(如果它很慢)并弄清楚它实际将时间花在哪里而不是盲目地尝试应用优化技巧总是很重要的。

请求的解决方案

如果您使用缓冲 IO 并且您的用例适合操作系统缓冲,则 IO 已经是异步的。您可能获得一些优势的唯一地方是预读,但如果您使用迭代器访问文件(您正在做的),Python 已经这样做了。 AsyncIO 对于移动大量数据并禁用操作系统缓冲区以防止复制数据两次的程序来说是一个优势。

您需要实际分析/基准测试您的程序,看看它是否有改进的余地。如果您的磁盘尚未受到吞吐量限制,那么就有机会通过并行执行每封电子邮件(地址?)的处理来提高性能。检查这一点的最简单方法可能是检查运行程序的核心是否已达到极限(即您受 CPU 限制而不是 IO 限制)。

如果您受 CPU 限制,那么您需要了解线程。不幸的是,Python 线程不能并行工作,除非你有非 Python 的工作要做,所以你必须使用 multiprocessing。 (我假设 validate_email 是一个 Python 函数)。

具体如何进行取决于程序中的瓶颈在哪里,以及需要多快的速度才能达到 IO 限制的程度(因为实际上你不能比你可以停止优化的速度更快你达到了那个点)。

emails set 对象很难共享,因为您需要锁定它,所以最好将它放在一个线程中。看着 multiprocessing图书馆最容易使​​用的机制可能是Process Pools .

使用它,您需要将可迭代的文件包装在 itertools.ifilter 中丢弃重复项,然后将其输入 Pool.imap_unordered然后迭代该结果并写入您的两个输出文件。

类似于:

with open(email_path) as f:
for result in Pool().imap_unordered(validate_map,
itertools.ifilter(unique, f):
(good, email) = result
if good:
good_emails.write(email)
else:
bad_emails.write(email)

validate_map 函数应该像这样简单:

def validate_map(e):
return (validate_email(e.strip(), verify=True), e)

unique 函数应该是这样的:

seen_emails = set()
def unique(e):
if e in seen_emails:
return False
seen_emails.add(e)
return True

ETA:我刚刚意识到 validate_email 是一个实际联系 SMTP 服务器的库。鉴于它在 Python 代码中不忙,您可以使用线程。 threading API 虽然不如多处理库方便,但您可以使用 multiprocessing.dummy有一个基于线程的池。

如果您受 CPU 限制,那么拥有比核心更多的线程/进程并不值得,但由于您的瓶颈是网络 IO,您可以从更多的线程/进程中获益。由于进程的成本很高,您希望切换到线程,然后增加并行运行的数量(尽管您应该礼貌地不要对正在连接的服务器进行 DOS 攻击)。

考虑 from multiprocessing.dummy import Pool as ThreadPool 然后调用 ThreadPool(processes=32).imap_unordered()

关于python - 如何在 python 代码段上应用并行或异步 I/O 文件写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33017446/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com