- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Python并发之多进程的方法实例代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
一,进程的理论基础 。
一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行.
进程和线程的区别:
进程与线程的共同点:
都是为了提高程序运行效率,都有执行的优先权 。
二,Python的多进程( multiprocessing模块) 。
创建一个进程(和创建线程类似) 。
方法一:创建Process对象,通过对象调用start()方法启动进程 。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from
multiprocessing
import
Process
def
foo(name):
print
(
'hello,%s'
%
name)
if
__name__
=
=
'__main__'
:
p1
=
Process(target
=
foo,args
=
(
'world'
,))
p2
=
Process(target
=
foo, args
=
(
'China'
,))
p1.start()
p2.start()
print
(
'=====主进程====='
)
# == == =主进程 == == =
# hello, world
# hello, China
#主进程和子进程并发执行
|
注意:Process对象只能在在 if __name__ == '__main__':下创建,不然会报错.
方法二:自定义一个类继承Process类,并重写run()方法,将执行代码放在其内 。
1
2
3
4
5
6
7
8
9
10
11
12
|
from
multiprocessing
import
Process
class
MyProcess(Process):
def
__init__(
self
,name):
super
().__init__()
self
.name
=
name
def
run(
self
):
print
(
'hello,%s'
%
self
.name)
if
__name__
=
=
'__main__'
:
myprocess1
=
MyProcess(
'world'
)
myprocess2
=
MyProcess(
'world'
)
myprocess1.start()
myprocess2.start()
|
Process内置方法 。
实例方法
p.start():启动进程,并调用该子进程中的p.run() 。
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 。
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 。
p.is_alive():如果p仍然运行,返回True 。
p.join([timeout]):主线程等待p终止。timeout是可选的超时时间 Process属性 。
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 。
p.name:进程的名称 。
p.pid:进程的pid 。
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 。
守护进程 。
类似于守护线程,只不过守护线程是对象的一个方法,而守护进程封装成对象的属性.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from
multiprocessing
import
Process
import
time
class
MyProcess(Process):
def
__init__(
self
,name):
super
().__init__()
self
.name
=
name
def
run(
self
):
time.sleep(
3
)
print
(
'hello,%s'
%
self
.name)
if
__name__
=
=
'__main__'
:
myprocess1
=
MyProcess(
'world'
)
myprocess1.daemon
=
True
myprocess1.start()
print
(
'结束'
)
#不会输出‘hello world',因为设置为守护进程,主进程不会等待
|
也可以使用join方法,使主进程等待 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from
multiprocessing
import
Process
import
time
class
MyProcess(Process):
def
__init__(
self
,name):
super
().__init__()
self
.name
=
name
def
run(
self
):
time.sleep(
3
)
print
(
'hello,%s'
%
self
.name)
if
__name__
=
=
'__main__'
:
myprocess1
=
MyProcess(
'world'
)
myprocess1.daemon
=
True
myprocess1.start()
myprocess1.join()
#程序阻塞
print
(
'结束'
)
join()
|
进程同步和锁 。
进程虽然不像线程共享资源,但是这并不意味着进程间不 需要加锁,比如不同进程会共享同一个终端 ( 屏幕),或者操作同一个文件,数据库,那么数据安全还是很有必要的,因此我们可以加锁.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from
multiprocessing
import
Process,Lock
import
time
def
a_print(l):
#需要传入对象,因为信息不共享
l.acquire()
print
(
'我要打印信息'
)
time.sleep(
1
)
print
(
'我打印完了'
)
l.release()
if
__name__
=
=
'__main__'
:
l
=
Lock()
for
i
in
range
(
20
):
p
=
Process(target
=
a_print,args
=
(l,))
p.start()
|
信号量(Semaphore) 。
能够并发执行的进程数,超出的进程阻塞,直到有进程运行完成.
Semaphore管理一个内置的计数器, 。
每当调用acquire()时内置计数器-1; 。
调用release() 时内置计数器+1; 。
计数器不能小于0;当计数器为0时,acquire()将阻塞进程直到其他进程调用release().
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from
multiprocessing
import
Process,Queue,Semaphore
import
time,random
def
seat(s,n):
s.acquire()
print
(
'学生%d坐下了'
%
n)
time.sleep(random.randint(
1
,
2
))
s.release()
if
__name__
=
=
'__main__'
:
s
=
Semaphore(
5
)
for
i
in
range
(
20
):
p
=
Process(target
=
seat,args
=
(s,i))
p.start()
print
(
'-----主进程-------'
)
|
注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程 。
事件(Event) 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from
multiprocessing
import
Process,Event
import
time, random
def
eating(event):
event.wait()
print
(
'去吃饭的路上...'
)
def
makeing(event):
print
(
'做饭中'
)
time.sleep(random.randint(
1
,
2
))
print
(
'做好了,快来...'
)
event.
set
()
if
__name__
=
=
'__main__'
:
event
=
Event()
t1
=
Process(target
=
eating,args
=
(event,))
t2
=
Process(target
=
makeing,args
=
(event,))
t1.start()
t2.start()
# 做饭中
# 做好了,快来...
# 去吃饭的路上...
|
和线程事件几乎一致 。
进程队列(Queue) 。
进程队列是进程通讯的方式之一。使用multiprocessing 下的Queue 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
from
multiprocessing
import
Process,Queue
import
time
def
func1(queue):
while
True
:
info
=
queue.get()
if
info
=
=
None
:
return
print
(info)
def
func2(queue):
for
i
in
range
(
10
):
time.sleep(
1
)
queue.put(
'is %d'
%
i)
queue.put(
None
)
#结束的标志
if
__name__
=
=
'__main__'
:
q
=
Queue()
p1
=
Process(target
=
func1,args
=
(q,))
p2
=
Process(target
=
func2, args
=
(q,))
p1.start()
p2.start()
Queue类的方法,源码如下:
class
Queue(
object
):
def
__init__(
self
, maxsize
=
-
1
):
#可以传参设置队列最大容量
self
._maxsize
=
maxsize
def
qsize(
self
):
#返回当前时刻队列中的个数
return
0
def
empty(
self
):
#是否为空
return
False
def
full(
self
): 是否满了
return
False
def
put(
self
, obj, block
=
True
, timeout
=
None
):
#放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
pass
def
put_nowait(
self
, obj):
#=put(False)
pass
def
get(
self
, block
=
True
, timeout
=
None
): 获取值,get方法有两个可选参数:blocked和timeout。如果blocked为
True
(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为
False
,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
pass
def
get_nowait(
self
):
# = get(False)
pass
def
close(
self
):
#将队列关闭
pass
def
join_thread(
self
):
#略,几乎不用
pass
def
cancel_join_thread(
self
):
pass
|
进程队列源码注释 。
进程池 。
进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以 通过维护一个进程池来控制进程的数量 。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用 。
同步调用 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from
multiprocessing
import
Pool
import
time, random, os
def
func(n):
pid
=
os.getpid()
print
(
'进程%s正在处理第%d个任务'
%
(pid,n),
'时间%s'
%
time.strftime(
'%H-%M-%S'
))
time.sleep(
2
)
res
=
'处理%s'
%
random.choice([
'成功'
,
'失败'
])
return
res
if
__name__
=
=
'__main__'
:
p
=
Pool(
4
)
#创建4个进程,
li
=
[]
for
i
in
range
(
10
):
res
=
p.
apply
(func,args
=
(i,)) 交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行
li.append(res)
for
i
in
li:
print
(i)
|
#进程1916正在处理第0个任务 时间21-02-53 #进程1240正在处理第1个任务 时间21-02-55 #进程3484正在处理第2个任务 时间21-02-57 #进程7512正在处理第3个任务 时间21-02-59 #进程1916正在处理第4个任务 时间21-03-01 #进程1240正在处理第5个任务 时间21-03-03 #进程3484正在处理第6个任务 时间21-03-05 #进程7512正在处理第7个任务 时间21-03-07 #进程1916正在处理第8个任务 时间21-03-09 #进程1240正在处理第9个任务 时间21-03-11 。
从结果可以发现两点:
因此进程池提供了 异步处理 的方式 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from
multiprocessing
import
Pool
import
time, random, os
def
func(n):
pid
=
os.getpid()
print
(
'进程%s正在处理第%d个任务'
%
(pid,n),
'时间%s'
%
time.strftime(
'%H-%M-%S'
))
time.sleep(
2
)
res
=
'处理%s'
%
random.choice([
'成功'
,
'失败'
])
return
res
if
__name__
=
=
'__main__'
:
p
=
Pool(
4
)
li
=
[]
for
i
in
range
(
10
):
res
=
p.apply_async(func,args
=
(i,)) 结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来)
li.append(res)
p.close()
#join之前需要关闭进程池
p.join()
#因为异步,所以需要等待池内进程工作结束再继续
for
i
in
li:
print
(i.get())
#i是一个对象,通过get方法获取返回值,而同步则没有该方法
|
关于回调函数 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
from
multiprocessing
import
Pool
import
time, random, os
def
func(n):
pid
=
os.getpid()
print
(
'进程%s正在处理第%d个任务'
%
(pid,n),
'时间%s'
%
time.strftime(
'%H-%M-%S'
))
time.sleep(
2
)
res
=
'处理%s'
%
random.choice([
'成功'
,
'失败'
])
return
res
def
foo(info):
print
(info)
#传入值为进程执行结果
if
__name__
=
=
'__main__'
:
p
=
Pool(
4
)
li
=
[]
for
i
in
range
(
10
):
res
=
p.apply_async(func,args
=
(i,),callback
=
foo) callback()回调函数会在进程执行完之后调用(主进程调用)
li.append(res)
p.close()
p.join()
for
i
in
li:
print
(i.get())
|
有回调函数 。
总结 。
以上所述是小编给大家介绍的Python并发之多进程的方法实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我网站的支持! 。
原文链接:http://www.cnblogs.com/ifyoushuai/p/9471569.html 。
最后此篇关于Python并发之多进程的方法实例代码的文章就讲到这里了,如果你想了解更多关于Python并发之多进程的方法实例代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!