- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
首先,我们得到以下代码:
from validate_email import validate_email
import time
import os
def verify_emails(email_path, good_filepath, bad_filepath):
good_emails = open(good_filepath, 'w+')
bad_emails = open(bad_filepath, 'w+')
emails = set()
with open(email_path) as f:
for email in f:
email = email.strip()
if email in emails:
continue
emails.add(email)
if validate_email(email, verify=True):
good_emails.write(email + '\n')
else:
bad_emails.write(email + '\n')
if __name__ == "__main__":
os.system('cls')
verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")
当 emails.txt
包含大量行 (>1k) 时,我预计联系 SMTP 服务器是我程序中成本最高的部分。使用某种形式的并行或异步 I/O 应该可以大大加快速度,因为我可以等待多个服务器响应而不是顺序等待。
据我所知:
Asynchronous I/O operates by queuing a request for I/O to the file descriptor, tracked independently of the calling process. For a file descriptor that supports asynchronous I/O (raw disk devcies typically), a process can call aio_read() (for instance) to request a number of bytes be read from the file descriptor. The system call returns immediately, whether or not the I/O has completed. Some time later, the process then polls the operating system for the completion of the I/O (that is, buffer is filled with data).
老实说,我不太明白如何在我的程序中实现异步 I/O。谁能花一点时间向我解释一下整个过程?
EDIT 按照 PArakleta 的建议:
from validate_email import validate_email
import time
import os
from multiprocessing import Pool
import itertools
def validate_map(e):
return (validate_email(e.strip(), verify=True), e)
seen_emails = set()
def unique(e):
if e in seen_emails:
return False
seen_emails.add(e)
return True
def verify_emails(email_path, good_filepath, bad_filepath):
good_emails = open(good_filepath, 'w+')
bad_emails = open(bad_filepath, 'w+')
with open(email_path, "r") as f:
for result in Pool().imap_unordered(validate_map,
itertools.ifilter(unique, f):
(good, email) = result
if good:
good_emails.write(email)
else:
bad_emails.write(email)
good_emails.close()
bad_emails.close()
if __name__ == "__main__":
os.system('cls')
verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")
最佳答案
查看了 validate_email
包后,您真正的问题是您没有有效地对结果进行批处理。您应该只为每个域执行一次 MX 查找,然后只连接到每个 MX 服务器一次,通过握手,然后在一个批处理中检查该服务器的所有地址。值得庆幸的是,validate_email
包为您缓存了 MX 结果,但您仍然需要按服务器对电子邮件地址进行分组,以便将查询批量发送到服务器本身。
您需要编辑 validate_email
包来实现批处理,然后可能使用实际的 threading
库而不是 multiprocessing
.
分析您的程序(如果它很慢)并弄清楚它实际将时间花在哪里而不是盲目地尝试应用优化技巧总是很重要的。
请求的解决方案
如果您使用缓冲 IO 并且您的用例适合操作系统缓冲,则 IO 已经是异步的。您可能获得一些优势的唯一地方是预读,但如果您使用迭代器访问文件(您正在做的),Python 已经这样做了。 AsyncIO 对于移动大量数据并禁用操作系统缓冲区以防止复制数据两次的程序来说是一个优势。
您需要实际分析/基准测试您的程序,看看它是否有改进的余地。如果您的磁盘尚未受到吞吐量限制,那么就有机会通过并行执行每封电子邮件(地址?)的处理来提高性能。检查这一点的最简单方法可能是检查运行程序的核心是否已达到极限(即您受 CPU 限制而不是 IO 限制)。
如果您受 CPU 限制,那么您需要了解线程。不幸的是,Python 线程不能并行工作,除非你有非 Python 的工作要做,所以你必须使用 multiprocessing。 (我假设
validate_email
是一个 Python 函数)。
具体如何进行取决于程序中的瓶颈在哪里,以及需要多快的速度才能达到 IO 限制的程度(因为实际上你不能比你可以停止优化的速度更快你达到了那个点)。
emails
set 对象很难共享,因为您需要锁定它,所以最好将它放在一个线程中。看着 multiprocessing图书馆最容易使用的机制可能是Process Pools .
使用它,您需要将可迭代的文件包装在
itertools.ifilter
中丢弃重复项,然后将其输入 Pool.imap_unordered
然后迭代该结果并写入您的两个输出文件。
类似于:
with open(email_path) as f:
for result in Pool().imap_unordered(validate_map,
itertools.ifilter(unique, f):
(good, email) = result
if good:
good_emails.write(email)
else:
bad_emails.write(email)
validate_map
函数应该像这样简单:
def validate_map(e):
return (validate_email(e.strip(), verify=True), e)
unique
函数应该是这样的:
seen_emails = set()
def unique(e):
if e in seen_emails:
return False
seen_emails.add(e)
return True
ETA:我刚刚意识到 validate_email
是一个实际联系 SMTP 服务器的库。鉴于它在 Python 代码中不忙,您可以使用线程。 threading API 虽然不如多处理库方便,但您可以使用 multiprocessing.dummy有一个基于线程的池。
如果您受 CPU 限制,那么拥有比核心更多的线程/进程并不值得,但由于您的瓶颈是网络 IO,您可以从更多的线程/进程中获益。由于进程的成本很高,您希望切换到线程,然后增加并行运行的数量(尽管您应该礼貌地不要对正在连接的服务器进行 DOS 攻击)。
考虑 from multiprocessing.dummy import Pool as ThreadPool
然后调用 ThreadPool(processes=32).imap_unordered()
。
关于python - 如何在 python 代码段上应用并行或异步 I/O 文件写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33017446/
在使用 requests 库中的状态代码时,我遇到了一些奇怪的事情。每个 HTTP 状态代码都有一个常量,有些具有别名(例如,包括 200 的复选标记): url = 'https://httpbin
这是我得到的代码,但我不知道这两行是什么意思: o[arr[i]] = o[arr[i]] || {}; o = o[arr[i]]; 完整代码: var GLOBAL={}; GLOBAL.name
所以这个问题的答案What is the difference between Θ(n) and O(n)? 指出“基本上,当我们说算法是 O(n) 时,它也是 O(n2)、O(n1000000)、O
这是一个快速的想法;有人会说 O(∞) 实际上是 O(1) 吗? 我的意思是它不依赖于输入大小? 所以在某种程度上它是恒定的,尽管它是无限的。 或者是唯一“正确”的表达方式 O(∞)? 最佳答案 无穷
这是真的: log(A) + log(B) = log(A * B) [0] 这也是真的吗? O(log(A)) + O(log(B)) = O(log(A * B)) [1] 据我了解 O(f
我正在解决面试练习的问题,但我似乎无法找出以下问题的时间和空间复杂度的答案: Given two sorted Linked Lists, merge them into a third list i
我了解 Big-Oh 表示法。但是我该如何解释 O(O(f(n))) 是什么意思呢?是指增长率的增长率吗? 最佳答案 x = O(n)基本上意味着 x <= kn对于一些常量 k . 因此 x = O
我正在编写一个函数,该函数需要一个对象和一个投影来了解它必须在哪个字段上工作。 我想知道是否应该使用这样的字符串: const o = { a: 'Hello There' }; funct
直觉上,我认为这三个表达式是等价的。 例如,如果一个算法在 O(nlogn) + O(n) 或 O(nlogn + n) 中运行(我很困惑),我可以假设这是一个O(nlogn) 算法? 什么是真相?
根据 O'Reilly 的 Python in a Nutshell 中的 Alex Martelli,复杂度类 O(n) + O(n) = O(n)。所以我相信。但是我很困惑。他解释说:“N 的两个
O(n^2)有什么区别和 O(n.log(n)) ? 最佳答案 n^2 的复杂性增长得更快。 关于big-o - 大 O 符号 : differences between O(n^2) and O(n
每当我收到来自 MS outlook 的电子邮件时,我都会收到此标记 & nbsp ; (没有空格)哪个显示为?在 <>. 当我将其更改为 ISO-8859-1 时,浏览器页面字符集编码为 UTF-8
我很难理解 Algorithms by S. Dasgupta, C.H. Papadimitriou, and U.V. Vazirani - page 24 中的以下陈述它们将 O(n) 的总和表
我在面试蛋糕上练习了一些问题,并在问题 2给出的解决方案使用两个单独的 for 循环(非嵌套),解决方案提供者声称他/她在 O(n) 时间内解决了它。据我了解,这将是 O(2n) 时间。是我想错了吗,
关于 Java 语法的幼稚问题。什么 T accept(ObjectVisitorEx visitor); 是什么意思? C# 的等价物是什么? 最佳答案 在 C# 中它可能是: O Accept(
假设我有一个长度为 n 的数组,我使用时间为 nlogn 的排序算法对它进行了排序。得到这个排序后的数组后,我遍历它以找到任何具有线性时间的重复元素。我的理解是,由于操作是分开发生的,所以时间是 O(
总和 O(1)+O(2)+ .... +O(n) 的计算结果是什么? 我在某处看到它的解决方案: O(n(n+1) / 2) = O(n^2) 但我对此并不满意,因为 O(1) = O(2) = co
这个问题在这里已经有了答案: 11 年前关闭。 Possible Duplicate: Plain english explanation of Big O 我想这可能是类里面教的东西,但作为一个自学
假设我有两种算法: for (int i = 0; i 2)更长的时间给定的一些n - 其中n这种情况的发生实际上取决于所涉及的算法 - 对于您的具体示例, n 2)分别时间,您可能会看到: Θ(n)
这个问题在这里已经有了答案: Example of a factorial time algorithm O( n! ) (4 个回答) 6年前关闭。 我见过表示为 O(X!) 的 big-o 示例但
我是一名优秀的程序员,十分优秀!