gpt4 book ai didi

python - 如何与 python 并行处理输入,但没有进程?

转载 作者:太空宇宙 更新时间:2023-11-03 11:48:40 25 4
gpt4 key购买 nike

我有一个输入数据列表,想并行处理它,但由于涉及网络 io,处理每个数据都需要时间。 CPU 使用不是问题。

我不希望有额外进程的开销,因为我一次有很多事情要处理并且不想设置进程间通信。

# the parallel execution equivalent of this?
import time
input_data = [1,2,3,4,5,6,7]
input_processor = time.sleep
results = map(input_processor, input_data)

我使用的代码使用了 twisted.internet.defer,所以涉及它的解决方案也很好。

最佳答案

您可以轻松定义并行工作直到队列为空的 Worker 线程。

from threading import Thread
from collections import deque
import time


# Create a new class that inherits from Thread
class Worker(Thread):

def __init__(self, inqueue, outqueue, func):
'''
A worker that calls func on objects in inqueue and
pushes the result into outqueue

runs until inqueue is empty
'''

self.inqueue = inqueue
self.outqueue = outqueue
self.func = func
super().__init__()

# override the run method, this is starte when
# you call worker.start()
def run(self):
while self.inqueue:
data = self.inqueue.popleft()
print('start')
result = self.func(data)
self.outqueue.append(result)
print('finished')


def test(x):
time.sleep(x)
return 2 * x


if __name__ == '__main__':
data = 12 * [1, ]
queue = deque(data)
result = deque()

# create 3 workers working on the same input
workers = [Worker(queue, result, test) for _ in range(3)]

# start the workers
for worker in workers:
worker.start()

# wait till all workers are finished
for worker in workers:
worker.join()

print(result)

正如预期的那样,这大约运行。 4 秒。

也可以编写一个简单的 Pool 类来消除 main 函数中的噪音:

from threading import Thread
from collections import deque
import time


class Pool():

def __init__(self, n_threads):
self.n_threads = n_threads

def map(self, func, data):
inqueue = deque(data)
result = deque()

workers = [Worker(inqueue, result, func) for i in range(self.n_threads)]

for worker in workers:
worker.start()

for worker in workers:
worker.join()

return list(result)


class Worker(Thread):

def __init__(self, inqueue, outqueue, func):
'''
A worker that calls func on objects in inqueue and
pushes the result into outqueue

runs until inqueue is empty
'''

self.inqueue = inqueue
self.outqueue = outqueue
self.func = func
super().__init__()

# override the run method, this is starte when
# you call worker.start()
def run(self):
while self.inqueue:
data = self.inqueue.popleft()
print('start')
result = self.func(data)
self.outqueue.append(result)
print('finished')


def test(x):
time.sleep(x)
return 2 * x


if __name__ == '__main__':
data = 12 * [1, ]

pool = Pool(6)
result = pool.map(test, data)

print(result)

关于python - 如何与 python 并行处理输入,但没有进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33347648/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com