gpt4 book ai didi

python - 使用协程与线程时的吞吐量差异

转载 作者:太空狗 更新时间:2023-10-30 01:03:21 26 4
gpt4 key购买 nike

几天前我在 SO 上问了一个关于帮助我设计一个构造多个 HTTP 请求的范例的问题

这是场景。我想要一个多生产者、多消费者系统。我的制作人抓取并抓取了一些网站,并将找到的链接添加到队列中。由于我将抓取多个站点,因此我希望有多个生产者/抓取工具。

消费者/ worker 从这个队列中获取数据,向这些链接发出 TCP/UDP 请求并将结果保存到我的 Django 数据库中。我还希望有多个工作人员,因为每个队列项目彼此完全独立。

人们建议为此使用协程库,即 Gevent 或 Eventlet。从未使用过协同程序,我读到即使编程范例类似于线程范例,但只有一个线程在主动执行,但是当发生阻塞调用时 - 例如 I/O 调用 - 堆栈在内存中切换,另一个绿色线程接管直到它遇到某种阻塞 I/O 调用。希望我做对了吗?这是我的一篇 SO 帖子中的代码:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid


def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()


def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)


for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

这很有效,因为 sleep 调用是阻塞调用,当 sleep 事件发生时,另一个绿色线程接管。这比顺序执行快很多。如您所见,我的程序中没有任何代码故意让一个线程执行另一个线程。我看不出这如何适合上面的场景,因为我想让所有线程同时执行。

一切正常,但我感觉我使用 Gevent/Eventlets 实现的吞吐量高于原始顺序运行程序,但大大低于使用实时线程实现的吞吐量。

如果我要使用线程机制重新实现我的程序,我的每个生产者和消费者都可以同时工作,而无需像协程那样进出堆栈。

是否应该使用线程重新实现?我的设计有问题吗?我没有看到使用协程的真正好处。

也许我的概念有点模糊,但这就是我所吸收的。对我的范例和概念的任何帮助或澄清都会很棒。

谢谢

最佳答案

As you can see, I don't have any code in my program that purposely yields the execution of one thread to another thread. I fail to see how this fits into scenario above as I would like to have all the threads executing simultaneously.

有一个操作系统线程,但有多个 greenlet。在您的情况下 gevent.sleep() 允许工作人员并发执行。阻塞 IO 调用,例如 urllib2.urlopen(url).read() 如果您使用 urllib2 修补以与 gevent 一起工作(通过调用 gevent.monkey.patch_*())。

另见 A Curious Course on Coroutines and Concurrency了解代码如何在单线程环境中并发工作。

要比较 gevent、线程、多处理之间的吞吐量差异,您可以编写与所有方法兼容的代码:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
from Queue import Queue as JoinableQueue
from threading import Thread
if concurrency_impl == 'multiprocessing':
from multiprocessing import Process as Thread, JoinableQueue

脚本的其余部分对于所有并发实现都是相同的:

def do_work(wid, value):
time.sleep(random.randint(0,2))
info("%d Task %s done" % (wid, value))

def worker(wid, q):
while True:
item = q.get()
try:
info("%d Got item %s" % (wid, item))
do_work(wid, item)
finally:
q.task_done()
info("%d Done item %s" % (wid, item))

def producer(pid, q):
for item in iter(lambda: random.randint(1, 11), 10):
time.sleep(.1) # simulate a green blocking call that yields control
info("%d Added item %s" % (pid, item))
q.put(item)
info("%d Signal Received" % (pid,))

不要在模块级别执行代码,将其放在 main() 中:

def main():
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(process)d %(message)s")

q = JoinableQueue()
it = count(1)
producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
for t in producers+workers:
t.daemon = True
t.start()

for t in producers: t.join() # put items in the queue
q.join() # wait while it is empty
# exit main thread (daemon workers die at this point)

if __name__=="__main__":
main()

关于python - 使用协程与线程时的吞吐量差异,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9247641/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com