gpt4 book ai didi

python - Asyncio imap 获取邮件 python3

转载 作者:太空狗 更新时间:2023-10-29 22:22:26 24 4
gpt4 key购买 nike

我正在使用 asyncio 模块进行测试,但是我需要提示/建议如何以异步方式获取大型电子邮件。

我有一个包含邮件帐户的用户名和密码的列表。

data = [
{'usern': 'foo@bar.de', 'passw': 'x'},
{'usern': 'foo2@bar.de', 'passw': 'y'},
{'usern': 'foo3@bar.de', 'passw': 'z'} (...)
]

我想过:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()

但是,较长的部分是下载电子邮件附件。

邮箱:

@asyncio.coroutine
def get_attachment(d):
username = d['usern']
password = d['passw']

connection = imaplib.IMAP4_SSL('imap.bar.de')
connection.login(username, password)
connection.select()

# list all available mails
typ, data = connection.search(None, 'ALL')

for num in data[0].split():
# fetching each mail
typ, data = connection.fetch(num, '(RFC822)')
raw_string = data[0][1].decode('utf-8')
msg = email.message_from_string(raw_string)
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue

if part.get('Content-Disposition') is None:
continue

if part.get_filename():
body = part.get_payload(decode=True)
# do something with the body, async?

connection.close()
connection.logout()

我如何以异步方式处理所有(下载附件)邮件?

最佳答案

如果您没有基于异步 I/O 的 imap 库,您可以只使用 concurrent.futures.ThreadPoolExecutor在线程中执行 I/O。 Python 将在 I/O 期间释放 GIL,因此您将获得真正的并发性:

def init_connection(d):    
username = d['usern']
password = d['passw']

connection = imaplib.IMAP4_SSL('imap.bar.de')
connection.login(username, password)
connection.select()
return connection

local = threading.local() # We use this to get a different connection per thread
def do_fetch(num, d, rfc):
try:
connection = local.connection
except AttributeError:
connnection = local.connection = init_connection(d)
return connnection.fetch(num, rfc)

@asyncio.coroutine
def get_attachment(d, pool):
connection = init_connection(d)
# list all available mails
typ, data = connection.search(None, 'ALL')

# Kick off asynchronous tasks for all the fetches
loop = asyncio.get_event_loop()
futs = [asyncio.create_task(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
for num in data[0].split()]

# Process each fetch as it completes
for fut in asyncio.as_completed(futs):
typ, data = yield from fut
raw_string = data[0][1].decode('utf-8')
msg = email.message_from_string(raw_string)
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue

if part.get('Content-Disposition') is None:
continue

if part.get_filename():
body = part.get_payload(decode=True)
# do something with the body, async?

connection.close()
connection.logout()


loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=5) # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
loop.close()

这不如真正基于异步 I/O 的解决方案好,因为您仍然有创建线程的开销,这限制了可扩展性并增加了额外的内存开销。由于所有代码都包装了实际的 I/O 调用,您也确实会一些 GIL 变慢。尽管如此,如果您处理的邮件少于数千封,它仍然可以正常运行。

我们使用 run_in_executor 使用ThreadPoolExecutor作为异步事件循环的一部分, asyncio.async 包装在 asyncio.Future 中返回的协程对象, 和 as_completed 按照 futures 完成的顺序遍历 futures。

编辑:

好像imaplib不是线程安全的。我编辑了我的答案以通过 threading.local 使用线程本地存储,这允许我们为每个线程创建一个连接对象,它可以在线程的整个生命周期中重复使用(意味着您只创建 num_workers 连接对象,而不是为每个 fetch 创建一个新连接)。

关于python - Asyncio imap 获取邮件 python3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25038204/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com