- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文总结:什么是GIL全局解释器锁,什么是死锁现象? 了解知识-什么是递归锁,信号量; python的多线程在什么场景下适用? 什么是Event事件,线程q; 进程池和线程池的实现; 什么是协程,以及如何使用gevent模块;最后介绍了IO模型的分类···👆
官网文档:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
文档剖析:
在Cpython中GIL全局解释器锁其实也是一把互斥锁,主要用于阻止同一个进程下的多个线程同时被运行(通俗理解:python的多线程无法使用多核优势);
GIL肯定存在于CPython解释器中 主要原因就在于Cpython解释器的内存管理不是线程安全的;
内存管理:垃圾回收机制
Cpython解释器自带的GIL解释器锁,线程要想执行代码去抢锁,抢python解释器,之后才回收,那么这样就能保证了阻止同一个进程下的多个线程同时被运行,不容易造成数据错乱;比如,抢票,如果你提交了订单,那么别人还能操作到你这张票的订单吗?不会了吧;这样就进而使数据不容易错乱;
name='hz'
,还没有来得急绑定关系,垃圾回收机制就可能给你回收了,因为垃圾回收也是线程,想要执行也得拿解释器来执行,但是不是和你的代码串行;验证之前需要明白什么是多道技术
(切换+保存状态)
多道技术什么时候切换:程序执行时间长、程序有IO操作(√,示例利用IO)
from threading import Thread
import time
m = 100
def test():
global m
tmp = m
# time.sleep(1)
# IO操作,造成数据错乱,sleep(1)运行释放了GIL锁,100个线程同样的操作反复执行,导致结果为99,如果没有IO操作结果为0
tmp -= 1
m = tmp
for i in range(100):
t = Thread(target=test)
t.start()
time.sleep(3)
print(m) # 0
'''
同一个进程下的多个线程虽然有GIL的存在不会出现并行的效果,但是如果线程内有IO操作还是会造成数据的错乱,这个时候需要我们额外的添加互斥锁(就不止GIL一把锁了)
'''
补:抢锁释放锁简写方式
a = Lock()
# 方式一:
a.acquire()
'''代码体'''
a.release()
# 方式二:
with a:
'''代码体'''
# 适用with上下文管理器,会自动抢锁释放锁
from threading import Thread,Lock
import time
mutex = Lock()
m = 100
def test():
global m
with mutex:
tmp = m
time.sleep(0.1)
# IO操作,造成数据错乱,sleep(1)运行释放了GIL锁,100个线程同样的操作反复执行,导致结果为99,如果没有IO操作结果为0
tmp -= 1
m = tmp
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=test)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(m) # 0
存在多把锁的情况,会出现死锁现象;
from threading import Thread, Lock
import time
A = Lock()
B = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
A.acquire() # 抢锁
print('%s 抢到了A锁' % self.name) # 相当于获取线程名称
# current_thread().name 获取线程名称
B.acquire()
print('%s 抢到了B锁' % self.name)
time.sleep(1)
B.release() # 释放锁
print('%s 释放了B锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
def func2(self):
B.acquire()
print('%s 抢到了B锁' % self.name)
A.acquire()
print('%s 抢到了A锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
B.release()
print('%s 释放了B锁' % self.name)
for i in range(10):
obj = MyThread()
obj.start()
1、线程1抢A锁,B锁,其他线程等待;
2、线程1释放了B锁,其他线程等待,因为A锁没有释放;线程1释放了A锁,其他线程才能去func1中抢锁;
3、线程1去func2中抢B锁,A锁,其他线程抢func1中的A锁和B锁,现在锁还在线程1手中,那么就可能导致卡死现象;
4、通俗理解,这样就导致了,你要的在我手上,我要的在你手上,比如A和B,A在B家被锁了,B在A家被锁了,A和B拿的自己家的钥匙,但是他们在对方的家中,那么就死锁了;
递归锁特点:可以被连续的acquire和release,但是只能被第一个抢到这把锁执行上述操作,它的内部有一个计数器,每acquire一次计数加一,每release一次计数减一,只要计数不为0,其他人都无法抢锁;
模块:RLock
Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
acquired it.
from threading import Thread,Lock,RLock
import time
'''
mutexA = Lock()
mutexB = Lock()
相当于开启了两把锁;
'''
# 两把锁指向同一个内存空间地址,开启一把锁
A = B = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
A.acquire() # 抢锁
print('%s 抢到了A锁' % self.name) # 相当于获取线程名称
# current_thread().name 获取线程名称
B.acquire()
print('%s 抢到了B锁' % self.name)
time.sleep(1)
B.release() # 释放锁
print('%s 释放了B锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
def func2(self):
B.acquire()
print('%s 抢到了B锁' % self.name)
A.acquire()
print('%s 抢到了A锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
B.release()
print('%s 释放了B锁' % self.name)
if __name__ == '__main__':
for i in range(10):
obj = MyThread()
obj.start()
# 递归锁的适用不会死锁,抢一次锁计数加1,释放计数减1,其实就是一把锁没有导致混乱
信号量在不同的阶段可能对应不同的技术点;
在并发编程中信号量指的也是锁;
通俗理解:
互斥锁是一个厕所,那么信号量就是多个厕所
from threading import Thread,Semaphore
import time
import random
sm = Semaphore(5) # 括号内写几就代表几个坑位
def task(name):
sm.acquire() # 抢锁
print(f'{name} is running!')
# time.sleep(3)
time.sleep(random.randint(1,5))
sm.release() # 释放锁
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task,args=(f'拉屎{i}号',))
t.start()
# 信号量可以理解为拉屎的坑位,三个人抢锁(进入厕所),拉完了就是释放锁了;
如果面试官问你python多线程是不是没用啊?你是不是得分情况回答他,不能直接说有用啊,存在即合理···😂
视情况而定来判断是否需要多线程,看程序的类型;
程序的类型
IO密集型(常用):需要用户给定指令或者反馈来执行,比如博客页面,你动网页它就动,不需要实时更进更新等;
计算密集型:需要长时间的计算,比如自动驾驶,是不是需要长时间计算路况,或者匹配路线等;
单核多线程牛逼,多核情况下,计算密集型开多进程;IO密集型开多线程;
可以开多进程结合多线程,哈哈哈要啥有啥😁
比如有四个任务,每个任务耗时10s;
开设多进程没有太大的优势,共10s+
因为遇到IO操作cpu就不再服务,就需要切换,并且开设进程还需要申请内存空间和拷贝代码;
开设多线程有优势,10s+
不需要消耗额外的资源,只需要一个CPU(单核情况下IO密集型开多线程是完全有优势的,以耗时任务最长的为准!)
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为8核
start=time.time()
for i in range(400):
# 开进程
# p=Process(target=work) # run time is 10.905012845993042大部分时间耗费在创建进程上
# 开线程
p=Thread(target=work) # run time is 2.061677932739258
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
# IO密集型的时候,同样的任务多线程只需要2秒
计算密集型(占着cpu不放),比如有四个任务,每个任务耗时10s;
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count()) # 本机为8核
start=time.time()
for i in range(6):
p=Process(target=work) # 多进程耗时:run time is 6.857659339904785
# p=Thread(target=work) # 多线程耗时:run time is 24.021770000457764
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
# 计算密集型,多进程只需6s,这样只需看一个cpu计算任务所需的时间,那么多个cpu同时结束;
# 多线程就得排队来了,执行完一个继续下一个····
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行;
通俗理解:比如汽车生成车间,肯定得先冲压零部件,然后焊接这些部件(不能扯一体式冲压昂😓),然后涂装工艺,最后总装···
那么把这些一个个流程比作线程或进程,是不是得前面的进行完才能往下走?
案例:
from threading import Thread,Event
import time
event = Event() # 造红绿灯
def light():
print('红灯停')
time.sleep(3)
print('绿灯行')
# 行人走
event.set()
def people(name):
print(f'{name} 等红绿灯')
event.wait()
print(f'{name} 可以走了')
if __name__ == '__main__':
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=people,args=(f'{i}',))
t.start()
同一个进程下的多个线程数据是共享的,为什么同一个进程下还会去使用队列呢?因为队列是管道和锁构成的,使用队列也是为了保证数据得到安全(适用场景自定义)
import queue
# 先进先出的队列
# q = queue.Queue(3)
# q.put(1)
# q.get()
# q.get_nowait()
# q.full()
# q.empty()
# 后进先出队列,其实和堆栈大差不差
# q = queue.LifoQueue(3)
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get()) # 3
# 优先级队列:可以给放入队列中的数据设置进出的优先级
q = queue.PriorityQueue(4)
q.put((10,'111'))
q.put((100,'222'))
q.put((0,'333'))
q.put((-5,'444'))
print(q.get()) # (-5, '444')
# 数字越小,优先级越高,put((优先级,数据))
TCP服务端并发
# 面条版主体
import socket
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
while True:
sock,addr = server.accept()
# 通信循环
while True:
try:
data = sock.recv(1024)
if len(data) == 0:break
sock.send(data.upper()) # 发送大写
except ConnectionResetError as e:
print(e)
break
sock.close()
封装版
from threading import Thread
import socket
# 通信函数
def communication(sock):
# 通信循环
while True:
try:
data = sock.recv(1024)
if len(data) == 0: break
sock.send(data.upper()) # 发送大写
except ConnectionResetError as e:
print(e)
break
sock.close()
# 连接客户端函数
def server(ip,port):
server = socket.socket()
server.bind((ip,port))
server.listen(5)
while True:
sock,addr = server.accept()
# 开设多进程/多线程
t = Thread(target=communication,args=(sock,))
t.start()
if __name__ == '__main__':
s = Thread(target=server,args=('127.0.0.1',8080))
s.start()
思考:能否无限制的开设进程或者线程???
肯定是不能无限制开设的,如果单从技术层面上来说无限开设肯定是可以的并且是最高效,但是从硬件层面上来说是无法实现的(硬件的发展永远赶不上软件的发展速度)
这时候就出现了池,我们在合理适用计算机的时候,保证硬件正常工作的前提,去开设多进程和多线程,是最合理的,如果硬件崩溃了,软件也没用了;
模块:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
# 线程池:固定开设5个线程,5个线程不会重复出现重复创建和销毁(节省资源);
pool = ThreadPoolExecutor(5) # 括号内可以传数字,不传默认开设当前计算机cpu个数五倍的线程
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
def task(n):
print(n)
time.sleep(2)
return f'返回结果:{n*2}'
# pool.submit(task,1) # 朝池子中提交任务(异步)
'''
提交方式:同步、异步
'''
# print('main')
# 朝池子提交20个任务,每次只能接5个
t_list = []
for i in range(20):
res = pool.submit(task,i)
# print(res.result())
# 返回结果,异步变串行,同步提交
t_list.append(res)
pool.shutdown() # 关闭线程池,等待线程池中所有任务运行完毕
# 解决了等待卡顿
for t in t_list: # 异步提交结果,先起任务再返回结果
print('>>>>',t.result())
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
import os
# 进程池:固定开设几个进程,进程不会重复出现重复创建和销毁(节省资源);
pool = ProcessPoolExecutor(5) # 括号内可以传数字,不传默认开设当前计算机cpu个数
def task(n):
print(n,os.getpid())
time.sleep(2)
return f'进程号:{os.getpid()}'
# 异步提交任务的返回结果,应该通过回调机制来获取,而不是下面的for循环最后获取
def call_back(n): # n 返回的对象
print(n.result()) # n.result 相当于res.result
if __name__ == '__main__':
# 朝池子提交20个任务,每次只能接5个
# t_list = []
for i in range(20):
res = pool.submit(task,i).add_done_callback(call_back) # 回调机制
# print(res.result())
# 返回结果,异步变串行,同步提交
# t_list.append(res)
# pool.shutdown()
# # 解决了等待卡顿
# for t in t_list: # 异步提交结果,先起任务再返回结果
# print('>>>>',t.result())
主要操作:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
res = pool.submit(task,i).add_done_callback(call_back) # 回调机制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
# 创建进程池与线程池
# pool = ThreadPoolExecutor(5) # 可以自定义线程数 也可以采用默认策略
pool = ProcessPoolExecutor(5) # 可以自定义线程数 也可以采用默认策略
# 定义一个任务
def task(n):
print(n, os.getpid())
time.sleep(2)
return '>>>:%s' % n ** 2
# 定义一个回调函数:异步提交完之后有结果自动调用该函数
def call_back(a):
print('异步回调函数:%s' % a.result())
# 朝线程池中提交任务
# obj_list = []
for i in range(20):
res = pool.submit(task, i).add_done_callback(call_back) # 异步提交
# obj_list.append(res)
"""
同步:提交完任务之后原地等待任务的返回结果 期间不做任何事
异步:提交完任务之后不愿地等待任务的返回结果 结果由异步回调机制自动反馈
"""
# 等待线程池中所有的任务执行完毕之后 再获取各自任务的结果
# pool.shutdown()
# for i in obj_list:
# print(i.result()) # 获取任务的执行结果 同步
在windows电脑中如果是进程池的使用也需要在__main__下面
进程:资源单位
线程:工作单位
协程:程序员自定义的名词,意思是单线程下实现并发(程序员自己在代码层面上监测我们所有的IO操作,一但遇到IO,我们在代码级别完成切换,这样给cpu的感觉是程序一直在运行,没有IO操作从而提升效率)
多道技术:切换+保存技术
CPU被剥夺的条件:
程序长时间占用
程序进入IO操作
并发去实现切换+保存状态
欺骗CPU的行为:
单线程下我们如果能够自己检测IO操作并且自己实现代码层面的切换
那么对于CPU而言我们这个程序就没有IO操作,CPU会尽可能的被占用
注意切换不一定能提升效率,如果是IO密集型就会提升效率,计算密集型切换就会降低效率;
能够自主监测IO行为并切换
from gevent import monkey;monkey.patch_all()
# 固定代码格式加上之后才能检测所有的IO行为
from gevent import spawn
import time
def play(name):
print('%s play 1' % name)
time.sleep(5)
print('%s play 2' % name)
'''
两个方法有IO操作,代码一直在反复“跳”
'''
def eat(name):
print('%s eat 1' % name)
time.sleep(3)
print('%s eat 2' % name)
start = time.time()
# play('Hammer') # 正常的同步调用
# eat('Hammer') # 正常的同步调用 8s+
g1 = spawn(play, 'Hammer') # 异步提交
g2 = spawn(eat, 'Hammer') # 异步提交
g1.join()
g2.join() # 等待被监测的任务运行完毕
print('主', time.time() - start) # 单线程下实现并发,提升效率5s+
# 并发效果:一个服务端可以同时服务多个客户端
import socket
from gevent import monkey;monkey.patch_all()
from gevent import spawn
def talk(sock):
while True:
try:
data = sock.recv(1024)
if len(data) == 0:break
print(data)
sock.send(data+b'hi')
except ConnectionResetError as e:
print(e)
sock.close()
break
def servers():
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen()
while True:
sock, addr = server.accept()
spawn(talk,sock)
g1 = spawn(servers)
g1.join()
# 客户端开设几百个线程发消息即可
最牛的情况:多进程下开设多线程,多线程下开设协程
我们以后可能自己动手写的不多,一般都是使用别人封装好的模块或框架
IO模型研究的主要是网络IO(linux系统),理论为主,代码实现大部分为伪代码;
最为常见的一种IO模型,有两个等待的阶段(wait for data、copy data)
计算机1和计算机2数据传输,需要经过拷贝到内存到OSI,然后才到计算机2的OSI七层到内存;
系统调用阶段变为了非阻塞(轮询) 有一个等待的阶段(copy data),轮询的阶段是比较消耗资源的;
通俗的理解:会一直询问kernel有没有数据~,并不会阻塞操作,直到copy才会阻塞;
利用select或者epoll来监管多个程序 一旦某个程序需要的数据存在于内存中了 那么立刻通知该程序去取即可;
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程;通俗理解为:多个人排队取餐,监控select,如果参号了,kernel说好了,然后用户去取餐return;
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection;
只需要发起一次系统调用 之后无需频繁发送 有结果并准备好之后会通过异步回调机制反馈给调用者;
我正在尝试打印 timeval 类型的值。实际上我可以打印它,但我收到以下警告: 该行有多个标记 格式“%ld”需要“long int”类型,但参数 2 的类型为“struct timeval” 程序
我正在编写自己的 unix 终端,但在执行命令时遇到问题: 首先,我获取用户输入并将其存储到缓冲区中,然后我将单词分开并将它们存储到我的 argv[] 数组中。IE命令是“firefox”以启动存储在
我是 CUDA 的新手。我有一个关于一个简单程序的问题,希望有人能注意到我的错误。 __global__ void ADD(float* A, float* B, float* C) { con
我有一个关于 C 语言 CGI 编程的一般性问题。 我使用嵌入式 Web 服务器来处理 Web 界面。为此,我在服务器中存储了一个 HTML 文件。在此 HTML 文件中包含 JavaScript 和
**摘要:**在代码的世界中,是存在很多艺术般的写法,这可能也是部分程序员追求编程这项事业的内在动力。 本文分享自华为云社区《【云驻共创】用4种代码中的艺术试图唤回你对编程的兴趣》,作者: break
我有一个函数,它的任务是在父对象中创建一个变量。我想要的是让函数在调用它的级别创建变量。 createVariable testFunc() [1] "test" > testFunc2() [1]
以下代码用于将多个连续的空格替换为1个空格。虽然我设法做到了,但我对花括号的使用感到困惑。 这个实际上运行良好: #include #include int main() { int ch, la
我正在尝试将文件写入磁盘,然后自动重新编译。不幸的是,某事似乎不起作用,我收到一条我还不明白的错误消息(我是 C 初学者 :-)。如果我手动编译生成的 hello.c,一切正常吗?! #include
如何将指针值传递给结构数组; 例如,在 txt 上我有这个: John Doe;xxxx@hotmail.com;214425532; 我的代码: typedef struct Person{
我尝试编写一些代码来检索 objectID,结果是 2B-06-01-04-01-82-31-01-03-01-01 . 这个值不正确吗? // Send a SysObjectId SNMP req
您好,提前感谢您的帮助, (请注意评论部分以获得更多见解:即,以下示例中的成本列已添加到此问题中;西蒙提供了一个很好的答案,但成本列本身并未出现在他的数据响应中,尽管他提供的功能与成本列一起使用) 我
我想知道是否有人能够提出一些解决非线性优化问题的软件包的方法,而非线性优化问题可以为优化解决方案提供整数变量?问题是使具有相等约束的函数最小化,该函数受某些上下边界约束的约束。 我已经在R中使用了'n
我是 R 编程的初学者,正在尝试向具有 50 列的矩阵添加一个额外的列。这个新列将是该行中前 10 个值的平均值。 randomMatrix <- generateMatrix(1,5000,100,
我在《K&R II C 编程 ANSI C》一书中读到,“>>”和“0; nwords--) sum += *buf++; sum = (sum >>
当下拉列表的选择发生变化时,我想: 1) 通过 div 在整个网站上显示一些 GUI 阻止覆盖 2)然后处理一些代码 3) 然后隐藏叠加层。 问题是,当我在事件监听器函数中编写此逻辑时,将执行 onC
我正在使用 Clojure 和 RESTEasy 设计 JAX-RS REST 服务器. 据我了解,用 Lisp 系列语言编写的应用程序比用“传统”命令式语言编写的应用程序更多地构建为“特定于领域的语
我目前正在研究一种替代出勤监控系统作为一项举措。目前,我设计的用户表单如下所示: Time Stamp Userform 它的工作原理如下: 员工将选择他/她将使用的时间戳类型:开始时间、超时、第一次
我是一名学生,试图自学编程,从在线资源和像您这样的人那里获得帮助。我在网上找到了一个练习来创建一个小程序来执行此操作: 编写一个程序,读取数字 a 和 b(长整型)并列出 a 和 b 之间有多少个数字
我正在尝试编写一个 shell 程序,给定一个参数,打印程序的名称和参数中的每个奇数词(即,不是偶数词)。但是,我没有得到预期的结果。在跟踪我的程序时,我注意到,尽管奇数词(例如,第 5 个词,5 %
只是想知道是否有任何 Java API 可以让您控制台式机/笔记本电脑外壳上的 LED? 或者,如果不可能,是否有可能? 最佳答案 如果你说的是前面的 LED 指示电源状态和 HDD 繁忙状态,恐怕没
我是一名优秀的程序员,十分优秀!