- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
博文参考:
Python的并行(持续更新)_python 并行-CSDN博客 。
《Python并行编程 中文版》 。
一些相关概念请见上一篇博文.
线程是独立的处理流程,可以和系统的其他线程并行或并发地执行.
多线程可以共享数据和资源,利用所谓的共享内存空间.
每一个线程基本上包含3个元素:程序计数器,寄存器和栈.
线程的状态大体上可以分为ready, running, blocked.
多线程编程一般使用共享内容空间进行线程间的通讯,这就使管理内容空间成为多线程编程的重点和难点.
线程的典型应用是应用软件的并行化.
相比于进程,使用线程的优势主要是性能.
class threading.Thread(group=None, ## 一般设置为 None ,这是为以后的一些特性预留的
target=None, ## 当线程启动的时候要执行的函数
name=None, ## 线程的名字,默认会分配一个唯一名字 Thread-N
args=(), ## 使用 tuple 类型给 target 传递参数
kwargs={}) ## 使用 dict 类型给 target 传递参数
group
: 保留参数,通常设置为 None
。这是为以后的一些特性预留的。target
: 线程执行的函数或可调用对象。name
: 线程的名称。如果未指定,将生成一个默认名称,默认为:Thread-N。args
: 传递给 target
的参数元组(整形和浮点型数据输入到args
里)。kwargs
: 传递给 target
的关键字参数字典(字符串型数据输入到这里)。import threading
def function(i):
print("function called by thread %i\n" % i)
return
#threads = []
for i in range(5):
t = threading.Thread(target=function, args=(i,)) ## 用 function 函数初始化一个 Thread 对象 t,并将参数 i 传入;
#threads.append(t)
t.start() ## 线程被创建后不会马上执行,需要手动调用 .start() 方法执行线程
t.join() ## 阻塞调用 t 线程的主线程,t 线程执行结束,主线程才会继续执行
[Run] 。
function called by thread 0
function called by thread 1
function called by thread 2
function called by thread 3
function called by thread 4
function函数的输入只有一个int型数值,这里要注意的是,在使用threading.Thread()传参时,arg需要传入一个元组,所以输入的是(i,),也就是说要加个逗号,。因为type((i))是<class 'int'>.
import threading
# 定义一个线程函数,接受浮点型和字符串型参数
def calculate(data_float, data_string):
result = data_float * 2
print(f"Thread result for {data_float}: {result}")
print(f"Additional string data: {data_string}")
# 创建多个线程并启动
threads = []
data_float = [1.5, 2.5, 3.5] # 浮点型数据
data_string = ["Hello", "World", "OpenAI"] # 字符串型数据
for i in range(len(data_float)):
thread = threading.Thread(target=calculate, args=(data_float[i], data_string[i]))
threads.append(thread)
thread.start()
# 等待所有线程执行完成
for thread in threads:
thread.join()
print("All threads have finished execution.")
[Run] 。
Thread result for 1.5: 3.0
Additional string data: Hello
Thread result for 2.5: 5.0
Additional string data: World
Thread result for 3.5: 7.0
Additional string data: OpenAI
All threads have finished execution.
在 Python 的 threading 模块中,start() 和 join() 是 Thread 类的两个非常重要的方法,它们在多线程编程中扮演着关键的角色.
start()
方法用于启动线程。一旦调用此方法,线程将开始执行其 target
函数,即在创建 Thread
对象时指定的函数。Thread
对象后立即调用,无需任何参数。join()
方法用于等待线程终止。当一个线程执行 join()
方法时,它会等待调用 join()
的线程完成其执行,然后才继续执行。timeout
参数,单位为秒,表示等待线程终止的最长时间。如果 timeout
为 None
,则会无限期等待。start()
和 join()
start() 的必要性
start()
方法是启动这些任务的机制。没有它,线程将不会执行其目标函数。join() 的必要性
join()
用于同步线程,确保主线程等待所有子线程完成。这在以下情况下非常重要:
假设你有一个程序,它启动了多个线程来执行某些任务,然后程序需要在所有线程完成之前等待:
import threading
import time
def do_work():
print("Thread starting.")
time.sleep(1) # 模拟耗时操作
print("Thread finishing.")
if __name__ == "__main__":
threads = []
for i in range(3):
t = threading.Thread(target=do_work)
threads.append(t)
t.start()
for t in threads:
t.join() # 等待所有线程完成
print("All threads have completed.")
在这个例子中,如果没有使用 join(),主线程可能在子线程完成之前就退出了,导致程序结束,而子线程中的任务可能还没有完成。通过使用 join(),我们确保了所有子线程在程序退出前都已经完成.
简单来说就是,如果没有join(),程序运行时首先会输出三个Thread starting,然后再输出All threads have completed,这时候主线程已经结束了,并且后面的程序此时已经开始执行了。而在1秒之后,才会输出Thread finishing,也就是说在这一秒内子线程还没结束,程序就会一边运行后面的程序一边运行子线程... 。
这里还有一个不太正确的写法( 好像网上一些博主是这么举例的):
import threading
import time
def do_work():
print("Thread starting.")
time.sleep(1) # 模拟耗时操作
print("Thread finishing.")
if __name__ == "__main__":
for i in range(3):
t = threading.Thread(target=do_work)
t.start()
t.join()
print("All threads have completed.")
import threading
import time
def do_work():
print("Thread starting.")
time.sleep(1) # 模拟耗时操作
print("Thread finishing.")
if __name__ == "__main__":
for i in range(3):
do_work()
print("All threads have completed.")
通常一个服务进程中有多个线程服务,负责不同的操作,所以对于线程的命名是很重要的; 。
Python中每一个线程在被 Thread 被创建时都有一个默认的名字(可以修改); 。
import threading
import time
def first_func():
print(threading.current_thread().name + str(" is Starting"))
time.sleep(2)
print(threading.current_thread().name + str("is Exiting"))
return
def second_func():
print(threading.current_thread().name + str(" is Starting"))
time.sleep(2)
print(threading.current_thread().name + str("is Exiting"))
return
def third_func():
print(threading.current_thread().name + str(" is Starting"))
time.sleep(2)
print(threading.current_thread().name + str("is Exiting"))
return
if __name__ == "__main__":
t1 = threading.Thread(name="first_func", target=first_func)
t2 = threading.Thread(name="second_func", target=second_func)
t3 = threading.Thread(target=third_func)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
[Run] 。
first_func is Starting
second_func is Starting
Thread-1 is Starting
second_funcis Exiting
first_funcis ExitingThread-1is Exiting
上面的程序输出了当前的进行的线程名称,并且将thrid_func对应的线程命名为Thread-1 。
从上面运行结果可以看出,如果不用 name= 参数指定线程名称的话,那么线程名称将使用默认值.
使用 threading 模块实现一个线程,需要3步:
定义一个 Thread 类的子类; 。
重写 __init__(self, [,args]) 方法; 。
重写 run(self, [,args]) 方法实现一个线程; 。
import threading
import time
class myThread(threading.Thread): ## 定义一个 threading 子类,继承 threading.Thread 父类
def __init__(self, threadID, name, counter): ## 重写 __init__() 方法,并添加额外的参数
threading.Thread.__init__(self) ## 初始化继承自Thread类的属性,使子类对象能够正确地继承和使用父类的属性和方法
self.threadID = threadID ## 子类额外的属性
self.name = name
self.counter = counter
def run(self):
print("Starting " + self.name)
#作用是首先延迟5秒,然后输出当前时间,一共输出self.counter次
print_time(self.name, 5,self.counter) #调用后面的print_time()函数
print("Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
## 创建线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
## 开启线程
thread1.start()
thread2.start()
## .join()
thread1.join()
thread2.join()
print("Exiting Main Thread")
[Run] 。
Starting Thread-1
Starting Thread-2
Thread-1: Wed May 22 20:42:33 2024
Exiting Thread-1Thread-2: Wed May 22 20:42:33 2024
Thread-2: Wed May 22 20:42:38 2024
Exiting Thread-2
Exiting Main ThreadThread
并发线程中,多个线程对共享内存进行操作,并且至少有一个可以改变数据。这种情况下如果没有同步机制,那么多个线程之间就会产生竞争,从而导致代码无效或出错.
解决多线程竞争问题的最简单的方法就是用锁 (Lock)。当一个线程需要访问共享内存时,它必须先获得 Lock 之后才能访问;当该线程对共享资源使用完成后,必须释放 Lock,然后其他线程在拿到 Lock 进行访问资源。因此,为了避免多线程竞争的出现,必须保证:同一时刻只能允许一个线程访问共享内存.
在实际使用中,该方法经常会导致一种 死锁 现象,原因是不同线程互相拿着对方需要的 Lock,导致死锁的发生.
详见: https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/06_Thread_synchronization_with_Lock_and_Rlock.html 。
import threading
shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock() ## 锁
## 有锁的情况
def increment_with_lock():
# shared_resource_with_lock 即最外面的 shared_resource_with_lock
# 这样写就不需要再通过函数的参数引入 shared_resource_with_lock 了
global shared_resource_with_lock
for _ in range(COUNT):
shared_resource_lock.acquire() ## 获取 锁
shared_resource_with_lock += 1
shared_resource_lock.release() ## 释放 锁
def decrement_with_lock():
global shared_resource_with_lock
for _ in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock -= 1
shared_resource_lock.release()
## 没有锁的情况
def increment_without_lock():
global shared_resource_with_no_lock
for _ in range(COUNT):
shared_resource_with_no_lock += 1
def decrement_without_lock():
global shared_resource_with_no_lock
for _ in range(COUNT):
shared_resource_with_no_lock -= 1
if __name__ == "__main__":
t1 = threading.Thread(target=increment_with_lock)
t2 = threading.Thread(target=decrement_with_lock)
t3 = threading.Thread(target=increment_without_lock)
t4 = threading.Thread(target=decrement_without_lock)
## 开启线程
t1.start()
t2.start()
t3.start()
t4.start()
## .join()
t1.join()
t2.join()
t3.join()
t4.join()
print ("the value of shared variable with lock management is %s" % shared_resource_with_lock)
print ("the value of shared variable with race condition is %s" % shared_resource_with_no_lock)
[Run] 。
the value of shared variable with lock management is 0
the value of shared variable with race condition is 79714
尽管在上面的结果中,没锁的情况下得到的结果有时候是正确的,但是执行多次,总会出现错误的结果;而有锁的情况下,执行多次,结果一定是正确的.
尽管理论上用锁的策略可以避免多线程中的竞争问题,但是可能会对程序的其他方面产生负面影响。此外,锁的策略经常会导致不必要的开销,也会限制程序的可扩展性和可读性。更重要的是,有时候需要对多进程共享的内存分配优先级,使用锁可能和这种优先级冲突。从实践的经验来看,使用锁的应用将对debug带来不小的麻烦。所以,最好使用其他可选的方法确保同步读取共享内存,避免竞争条件.
让我们总结一下:
acquire()
和 release()
需要遵循以下规则:
acquire()
将状态改为lockedacquire()
会被block直到另一线程调用 release()
释放锁release()
将导致 RuntimError
异常release()
将状态改为unlocked为了保证 “只有拿到锁的线程才能释放锁”,那么应该使用 RLock() 对象; 。
和 Lock()一样,RLock()也有acquire()和release()两种方法; 。
RLock() 有三个特点:
谁拿到谁释放。如果线程A拿到锁,线程B无法释放这个锁,只有A可以释放; 。
同一线程可以多次拿到该锁,即可以acquire多次; 。
acquire多少次就必须release多少次,只有最后一次release才能改变RLock的状态为unlocked,
import threading
import time
class Box(object):
lock = threading.RLock()
def __init__(self):
self.total_items = 0
def execute(self, n):
Box.lock.acquire()
self.total_items += n
Box.lock.release()
def add(self):
Box.lock.acquire()
self.execute(1)
Box.lock.release()
def remove(self):
Box.lock.acquire()
self.execute(-1)
Box.lock.release()
def adder(box, items):
while items > 0:
print("adding 1 item in the box")
box.add()
time.sleep(1)
items -= 1
def remover(box, items):
while items > 0:
print("removing 1 item in the box")
box.remove()
time.sleep(1)
items -= 1
if __name__ == "__main__":
items = 5
print("putting %s items in the box"% items)
box = Box()
t1 = threading.Thread(target=adder, args=(box, items))
t2 = threading.Thread(target=remover, args=(box, items))
t1.start()
t2.start()
t1.join()
t2.join()
print("%s items still remain in the box " % box.total_items)
[Run] 。
putting 5 items in the box
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
adding 1 item in the box
removing 1 item in the box
0 items still remain in the box
Box类的execute()方法包含RLock,adder()和remover()方法也包含RLock,就是说无论是调用Box还是adder()或者remover(),每个线程的每一步都有拿到资源、释放资源的过程.
信号量的定义: 信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取.
信号量是由操作系统管理的一种抽象数据类型,用于多线程中同步对共享资源的使用; 。
信号量是一个内部数据,用于表明当前共享资源可以有多少并发读取; 。
在 Threading 中,信号量的操作有两个函数:acquire()和release(); 。
同样的,在threading模块中,信号量的操作有两个函数,即 acquire() 和 release() ,解释如下:
acquire()
,此操作减少信号量的内部变量, 如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。release()
释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。虽然表面上看信号量机制没什么明显的问题,如果信号量的等待和通知操作都是原子的,确实没什么问题。但如果不是,或者两个操作有一个终止了,就会导致糟糕的情况.
举个例子,假设有两个并发的线程,都在等待一个信号量,目前信号量的内部值为1。假设第线程A将信号量的值从1减到0,这时候控制权切换到了线程B,线程B将信号量的值从0减到-1,并且在这里被挂起等待,这时控制权回到线程A,信号量已经成为了负值,于是第一个线程也在等待.
这样的话,尽管当时的信号量是可以让线程访问资源的,但是因为非原子操作导致了所有的线程都在等待状态.
注:"原子"指的是原子操作,即一个不可分割的操作。在多线程编程中,如果对信号量的等待和通知操作是原子的,意味着它们是以不可分割的方式执行的,其他线程无法在这些操作中插入。这样可以确保在多线程环境中,对信号量的操作是可靠的.
threading.Semaphore() 可以创建一个信号量对象,它可以控制对共享资源的访问数量。在创建信号量对象时,可以指定初始的许可数量。每次访问资源时,线程需要获取一个许可;当许可数量不足时,线程将会被阻塞,直到有足够的许可可用。访问资源完成后,线程释放许可,使得其他线程可以继续访问资源.
num
表示初始的许可数量(比如这个数量为1)下面的代码展示了信号量的使用,我们有两个线程, producer() 和 consumer() ,它们使用共同的资源,即item。 producer() 的任务是生产item, consumer() 的任务是消费item.
当item还没有被生产出来, consumer() 一直等待,当item生产出来, producer() 线程通知消费者资源可以使用了.
import threading
import time
import random
# 创建一个信号量semaphore,初始值为0。
# 信号量是一种同步机制,用于控制对共享资源的访问。
semaphore = threading.Semaphore(0)
print("init semaphore %s" % semaphore._value) # 打印初始信号量的值。
# 消费者线程将执行的函数。
def consumer():
print("consumer is waiting.") # 打印信息,表明消费者正在等待。
semaphore.acquire() # 消费者尝试获取信号量,如果信号量的值小于1,则等待。
print("consumer notify: consumed item number %s" % item) # 打印消费者消费的项目编号。
print("consumer semaphore %s" % semaphore._value) # 在消费后打印信号量的当前值。
# 生产者线程将执行的函数。
def producer():
global item # 声明item为全局变量,以便在函数内部修改。
time.sleep(10) # 生产者线程暂停10秒,模拟生产过程耗时。
item = random.randint(0, 1000) # 生产者生成一个随机的项目编号。
print("producer notify : produced item number %s" % item) # 打印生产者生产的产品编号。
semaphore.release() # 生产者释放信号量,增加信号量的值,允许其他等待的线程继续执行。
print("producer semaphore %s" % semaphore._value) # 在生产后打印信号量的当前值。
# 主程序入口。
if __name__ == "__main__":
for _ in range(0, 5): # 循环5次,模拟生产和消费过程。
t1 = threading.Thread(target=producer) # 创建生产者线程。
t2 = threading.Thread(target=consumer) # 创建消费者线程。
t1.start() # 启动生产者线程。
t2.start() # 启动消费者线程。
t1.join() # 等待生产者线程完成。
t2.join() # 等待消费者线程完成。
print("program terminated") # 打印程序结束的信息。
最后此篇关于Python并行运算——threading库详解(持续更新)的文章就讲到这里了,如果你想了解更多关于Python并行运算——threading库详解(持续更新)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
如果我错了,但身份验证 session 有 30 天的最大限制,请纠正我?如果是这种情况,有没有办法让我的服务器节点应用程序永远监听经过身份验证的 dataRef? 干杯, 旅行。 最佳答案 自 on
我目前正在阅读 book Continuos Delivery由 Humble/Farley 撰写,虽然里面的很多东西都是有道理的,但有一件事让我烦恼: 似乎作者只针对基于服务器的(单客户端?)应用程
好吧,我非常了解每个人对自制密码管理器的看法,但我希望得到帮助。 不用于实际使用,仅供学习。 我想知道,在 C++ 中如何拥有长期变量。或者真的,有什么长期的。 长期是什么意思?在下次运行 .exe
我在文本文件中有以下三行(最后 3 行): } } } 我想做的是做这样的事情: } } blablabla blablabla blabla
在 iOS 中,有没有一种简单的方法可以在每天的同一时间发送 10 天的推送通知?我不想向所有用户发送推送通知。我的应用程序的工作方式是,用户可以选择连续十天推送通知的时间。您有推荐的 API 吗?或
我正在努力寻找一种当前最先进的方法来处理频繁更新的通知(例如每 3 分钟一次)。似乎在较新的 Android 版本中内置了如此多的电源效率调整(幸运的是!),我之前成功使用的方法(使用 Broadca
我不得不在一些糟糕的房地产网站上花费大量时间。我比较精通 CSS,并且可以(在 FireFox 中)“检查元素”并更改 CSS 以隐藏或缩小特定页面的华而不实的元素。但我想将此自定义 CSS 应用于特
目前正在研究如何使用 signalR 在处理文件时向用户呈现文件的进度报告。我正在使用 asp.net MVC 4。通过 Ajax 进行发布/获取时,我可以轻松获取状态更改。 因为我需要上传一个文件(
这个问题在这里已经有了答案: How can I round up the time to the nearest X minutes? (15 个答案) Is there a simple fun
我有一个 php 脚本,我想运行特定的时间(例如 5 分钟),但只能运行一次。对于 cron 作业,这将无限期地运行。还有别的办法吗? 最佳答案 处理这个问题的方法是: 当某些事件触发需要 cron
我弄乱了我的 apache 和 php.ini 文件,我网站的用户仍然提示该网站在很短的时间后或每次他们关闭并打开同一个浏览器时将他们注销。 我正在运行 Apache 和 PHP。 我应该进行哪些设置
如何查询今天的总和需要减去前一天的总和,每天持续一个月。 SELECT COUNT(DISTINCT member_profile.memberProfileNumber) FROM member_p
这个问题在这里已经有了答案: How do I add a delay in a JavaScript loop? (32 个答案) 关闭 8 年前。 我认为这个问题之前一定有人问过,但我找不到其他
用户在我的网站上注册后,我们会向他发送一封确认电子邮件。我想要的是 - 三天内每 24 小时为用户重新发送一次电子邮件。例如: user_table id , name, date_registere
最近我从 Codeigniter 换到了 Laravel,一切都很顺利,除了我遇到了 Session::flash 的问题。 当我创建新用户时,我收到成功消息,但它会持续 2 个请求,即使我没有通过验
如果有人能帮助我解决这个问题,我将非常感激。 我正在尝试针对 CPU 使用率 >= 80% 持续 30 分钟或更长时间创建 Azure 监视器警报 我已附上警报规则条件的屏幕截图。在“评估依据”下,聚
如果有人能帮助我解决这个问题,我将非常感激。 我正在尝试针对 CPU 使用率 >= 80% 持续 30 分钟或更长时间创建 Azure 监视器警报 我已附上警报规则条件的屏幕截图。在“评估依据”下,聚
希望大家平安 1。我的目标 我正在尝试模拟 3 天的真实情况。系统每天只能工作 8 小时。 我的目标是模型运行 8 小时,持续 3 天,以获得足够的数据进行分析。 2。我的问题 我有一个代理预约时间表
我需要在 8 小时内每 5 分钟调用一次函数。问题是它必须是同一天。例如,如果用户在 3/29 晚上 11:59 登录系统,而现在是 3/30 凌晨 12:01,则不应再调用该函数。 我知道如何每
我正在开发一个 React Native 应用程序,该应用程序使用 Firebase 的 Firestore 作为后端。现在,每次收到新消息时,我都会从 Firestore 获取所有消息并更新我的状态
我是一名优秀的程序员,十分优秀!