gpt4 book ai didi

Python多进程通信Queue、Pipe、Value、Array实例

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 25 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Python多进程通信Queue、Pipe、Value、Array实例由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法.

1)Queue & JoinableQueue 。

queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue.

multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法.

task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成.

join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用).

代码:

  。

复制代码 代码如下:

import multiprocessing import time 。

  。

class Consumer(multiprocessing.Process):         def __init__(self, task_queue, result_queue):         multiprocessing.Process.__init__(self)         self.task_queue = task_queue         self.result_queue = result_queue 。

    def run(self):         proc_name = self.name         while True:             next_task = self.task_queue.get()             if next_task is None:                 # Poison pill means shutdown                 print ('%s: Exiting' % proc_name)                 self.task_queue.task_done()                 break             print ('%s: %s' % (proc_name, next_task))             answer = next_task() # __call__()             self.task_queue.task_done()             self.result_queue.put(answer)         return 。

class Task(object):     def __init__(self, a, b):         self.a = a         self.b = b     def __call__(self):         time.sleep(0.1) # pretend to take some time to do the work         return '%s * %s = %s' % (self.a, self.b, self.a * self.b)     def __str__(self):         return '%s * %s' % (self.a, self.b) 。

if __name__ == '__main__':     # Establish communication queues     tasks = multiprocessing.JoinableQueue()     results = multiprocessing.Queue()         # Start consumers     num_consumers = multiprocessing.cpu_count()     print ('Creating %d consumers' % num_consumers)     consumers = [ Consumer(tasks, results)                   for i in range(num_consumers) ]     for w in consumers:         w.start()         # Enqueue jobs     num_jobs = 10     for i in range(num_jobs):         tasks.put(Task(i, i))         # Add a poison pill for each consumer     for i in range(num_consumers):         tasks.put(None) 。

    # Wait for all of the tasks to finish     tasks.join()         # Start printing results     while num_jobs:         result = results.get()         print ('Result:', result)         num_jobs -= 1 。

  。

注意小技巧: 使用None来表示task处理完毕.

运行结果:

Python多进程通信Queue、Pipe、Value、Array实例

2)pipe 。

pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法.

代码:

复制代码 代码如下:

from multiprocessing import Process, Pipe 。

  。

def f(conn):     conn.send([42, None, 'hello'])     conn.close() 。

if __name__ == '__main__':     parent_conn, child_conn = Pipe()     p = Process(target=f, args=(child_conn,))     p.start()     p.join()     print(parent_conn.recv())   # prints "[42, None, 'hello']" 。

  。

3)Value + Array 。

Value + Array 是python中共享内存 映射文件的方法,速度比较快.

  。

复制代码 代码如下:

from multiprocessing import Process, Value, Array 。

  。

def f(n, a):     n.value = n.value + 1     for i in range(len(a)):         a[i] = a[i] * 10 。

if __name__ == '__main__':     num = Value('i', 1)     arr = Array('i', range(10)) 。

    p = Process(target=f, args=(num, arr))     p.start()     p.join() 。

    print(num.value)     print(arr[:])         p2 = Process(target=f, args=(num, arr))     p2.start()     p2.join() 。

    print(num.value)     print(arr[:]) 。

# the output is : # 2 # [0, 10, 20, 30, 40, 50, 60, 70, 80, 90] # 3 # [0, 100, 200, 300, 400, 500, 600, 700, 800, 900] 。

  。

最后此篇关于Python多进程通信Queue、Pipe、Value、Array实例的文章就讲到这里了,如果你想了解更多关于Python多进程通信Queue、Pipe、Value、Array实例的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com