gpt4 book ai didi

Python并发之多进程的方法实例代码

转载 作者:qq735679552 更新时间:2022-09-27 22:32:09 27 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Python并发之多进程的方法实例代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

一,进程的理论基础 。

一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行.

进程和线程的区别:

  • 进程是系统资源分配的基本单位。
  • 一个进程内可以包含多个线程,属于一对多的关系,进程内的资源,被其内的线程共享
  • 线程是进程运行的最小单位,如果说进程是完成一个功能,那么其线程就是完成这个功能的基本单位
  • 进程间资源不共享,多进程切换资源开销,难度大,同一进程内的线程资源共享,多线程切换资源开销,难度小

进程与线程的共同点:

都是为了提高程序运行效率,都有执行的优先权 。

二,Python的多进程( multiprocessing模块) 。

创建一个进程(和创建线程类似) 。

方法一:创建Process对象,通过对象调用start()方法启动进程 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process
def foo(name):
  print ( 'hello,%s' % name)
if __name__ = = '__main__' :
  p1 = Process(target = foo,args = ( 'world' ,))
  p2 = Process(target = foo, args = ( 'China' ,))
  p1.start()
  p2.start()
  print ( '=====主进程=====' )
  # == == =主进程 == == =
  # hello, world
  # hello, China
  #主进程和子进程并发执行

注意:Process对象只能在在 if __name__ == '__main__':下创建,不然会报错.

方法二:自定义一个类继承Process类,并重写run()方法,将执行代码放在其内 。

?
1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
class MyProcess(Process):
  def __init__( self ,name):
   super ().__init__()
   self .name = name
  def run( self ):
   print ( 'hello,%s' % self .name)
if __name__ = = '__main__' :
  myprocess1 = MyProcess( 'world' )
  myprocess2 = MyProcess( 'world' )
  myprocess1.start()
  myprocess2.start()

Process内置方法 。

实例方法

p.start():启动进程,并调用该子进程中的p.run() 。

p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  。

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 。

p.is_alive():如果p仍然运行,返回True 。

p.join([timeout]):主线程等待p终止。timeout是可选的超时时间 Process属性 。

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 。

p.name:进程的名称 。

p.pid:进程的pid 。

p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 。

守护进程 。

类似于守护线程,只不过守护线程是对象的一个方法,而守护进程封装成对象的属性.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
import time
class MyProcess(Process):
  def __init__( self ,name):
   super ().__init__()
   self .name = name
  def run( self ):
   time.sleep( 3 )
   print ( 'hello,%s' % self .name)
if __name__ = = '__main__' :
  myprocess1 = MyProcess( 'world' )
  myprocess1.daemon = True
  myprocess1.start()
  print ( '结束' )
#不会输出‘hello world',因为设置为守护进程,主进程不会等待

也可以使用join方法,使主进程等待 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import time
class MyProcess(Process):
  def __init__( self ,name):
   super ().__init__()
   self .name = name
  def run( self ):
   time.sleep( 3 )
   print ( 'hello,%s' % self .name)
if __name__ = = '__main__' :
  myprocess1 = MyProcess( 'world' )
  myprocess1.daemon = True
  myprocess1.start()
  myprocess1.join() #程序阻塞
  print ( '结束' )
join()

进程同步和锁 。

进程虽然不像线程共享资源,但是这并不意味着进程间不 需要加锁,比如不同进程会共享同一个终端 ( 屏幕),或者操作同一个文件,数据库,那么数据安全还是很有必要的,因此我们可以加锁.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process,Lock
import time
def a_print(l): #需要传入对象,因为信息不共享
  l.acquire()
  print ( '我要打印信息' )
  time.sleep( 1 )
  print ( '我打印完了' )
  l.release()
if __name__ = = '__main__' :
  l = Lock()
  for i in range ( 20 ):
   p = Process(target = a_print,args = (l,))
   p.start()

信号量(Semaphore) 。

能够并发执行的进程数,超出的进程阻塞,直到有进程运行完成.

Semaphore管理一个内置的计数器, 。

每当调用acquire()时内置计数器-1; 。

调用release() 时内置计数器+1; 。

计数器不能小于0;当计数器为0时,acquire()将阻塞进程直到其他进程调用release().

?
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process,Queue,Semaphore
import time,random
def seat(s,n):
  s.acquire()
  print ( '学生%d坐下了' % n)
  time.sleep(random.randint( 1 , 2 ))
  s.release()
if __name__ = = '__main__' :
  s = Semaphore( 5 )
  for i in range ( 20 ):
   p = Process(target = seat,args = (s,i))
   p.start()
  print ( '-----主进程-------' )

注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程 。

事件(Event) 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process,Event
import time, random
def eating(event):
  event.wait()
  print ( '去吃饭的路上...' )
def makeing(event):
  print ( '做饭中' )
  time.sleep(random.randint( 1 , 2 ))
  print ( '做好了,快来...' )
  event. set ()
if __name__ = = '__main__' :
  event = Event()
  t1 = Process(target = eating,args = (event,))
  t2 = Process(target = makeing,args = (event,))
  t1.start()
  t2.start()
  # 做饭中
  # 做好了,快来...
  # 去吃饭的路上...

和线程事件几乎一致 。

进程队列(Queue) 。

进程队列是进程通讯的方式之一。使用multiprocessing 下的Queue 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from multiprocessing import Process,Queue
import time
def func1(queue):
  while True :
   info = queue.get()
   if info = = None :
    return
   print (info)
def func2(queue):
  for i in range ( 10 ):
   time.sleep( 1 )
   queue.put( 'is %d' % i)
  queue.put( None ) #结束的标志
if __name__ = = '__main__' :
  q = Queue()
  p1 = Process(target = func1,args = (q,))
  p2 = Process(target = func2, args = (q,))
  p1.start()
  p2.start()
Queue类的方法,源码如下:
class Queue( object ):
  def __init__( self , maxsize = - 1 ): #可以传参设置队列最大容量
   self ._maxsize = maxsize
  def qsize( self ): #返回当前时刻队列中的个数
   return 0
  def empty( self ): #是否为空
   return False
  def full( self ): 是否满了
   return False
  def put( self , obj, block = True , timeout = None ): #放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
   pass
  def put_nowait( self , obj): #=put(False)
   pass
  def get( self , block = True , timeout = None ): 获取值,get方法有两个可选参数:blocked和timeout。如果blocked为 True (默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为 False ,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
   pass
  def get_nowait( self ): # = get(False)
   pass
  def close( self ): #将队列关闭
   pass
  def join_thread( self ): #略,几乎不用
   pass
  def cancel_join_thread( self ):
   pass

进程队列源码注释 。

进程池 。

进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以 通过维护一个进程池来控制进程的数量 。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用 。

同步调用 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
import time, random, os
def func(n):
  pid = os.getpid()
  print ( '进程%s正在处理第%d个任务' % (pid,n), '时间%s' % time.strftime( '%H-%M-%S' ))
  time.sleep( 2 )
  res = '处理%s' % random.choice([ '成功' , '失败' ])
  return res
if __name__ = = '__main__' :
  p = Pool( 4 ) #创建4个进程,
  li = []
  for i in range ( 10 ):
   res = p. apply (func,args = (i,)) 交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行
   li.append(res)
  for i in li:
   print (i)

#进程1916正在处理第0个任务 时间21-02-53 #进程1240正在处理第1个任务 时间21-02-55 #进程3484正在处理第2个任务 时间21-02-57 #进程7512正在处理第3个任务 时间21-02-59 #进程1916正在处理第4个任务 时间21-03-01 #进程1240正在处理第5个任务 时间21-03-03 #进程3484正在处理第6个任务 时间21-03-05 #进程7512正在处理第7个任务 时间21-03-07 #进程1916正在处理第8个任务 时间21-03-09 #进程1240正在处理第9个任务 时间21-03-11 。

从结果可以发现两点:

  1. 不是并发处理
  2. 一直都只有四个进程,串行执行

因此进程池提供了 异步处理 的方式 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool
import time, random, os
def func(n):
  pid = os.getpid()
  print ( '进程%s正在处理第%d个任务' % (pid,n), '时间%s' % time.strftime( '%H-%M-%S' ))
  time.sleep( 2 )
  res = '处理%s' % random.choice([ '成功' , '失败' ])
  return res
 
if __name__ = = '__main__' :
  p = Pool( 4 )
  li = []
  for i in range ( 10 ):
   res = p.apply_async(func,args = (i,)) 结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来)
   li.append(res)
 
  p.close() #join之前需要关闭进程池
  p.join() #因为异步,所以需要等待池内进程工作结束再继续
  for i in li:
   print (i.get()) #i是一个对象,通过get方法获取返回值,而同步则没有该方法

关于回调函数 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Pool
import time, random, os
def func(n):
  pid = os.getpid()
  print ( '进程%s正在处理第%d个任务' % (pid,n), '时间%s' % time.strftime( '%H-%M-%S' ))
  time.sleep( 2 )
  res = '处理%s' % random.choice([ '成功' , '失败' ])
  return res
 
def foo(info):
  print (info) #传入值为进程执行结果
 
if __name__ = = '__main__' :
  p = Pool( 4 )
  li = []
  for i in range ( 10 ):
   res = p.apply_async(func,args = (i,),callback = foo) callback()回调函数会在进程执行完之后调用(主进程调用)
   li.append(res)
 
  p.close()
  p.join()
  for i in li:
   print (i.get())

有回调函数 。

总结 。

以上所述是小编给大家介绍的Python并发之多进程的方法实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我网站的支持! 。

原文链接:http://www.cnblogs.com/ifyoushuai/p/9471569.html 。

最后此篇关于Python并发之多进程的方法实例代码的文章就讲到这里了,如果你想了解更多关于Python并发之多进程的方法实例代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com