gpt4 book ai didi

python - 如何从 python 中的主脚本退出多处理池进程中的 while 循环

转载 作者:行者123 更新时间:2023-12-05 06:13:39 27 4
gpt4 key购买 nike

编辑 3:请参阅末尾的最后一个示例。

我需要一个 while 循环,通过 USB 连接进行连续的发送和返回操作。在此连续操作期间,我需要(在我的主脚本中的其他内容中)在同一个 USB 连接上进行一些相同且隔离的发送/返回操作。这似乎需要多处理和一些调整。

我想对多处理库使用以下解决方法:

  1. 将连续发送/返回操作放在具有池 (apply_async) 的不同线程上。
  2. 当我执行隔离的发送/返回操作(使用 clear())时,将此过程置于“暂停”状态。
  3. 在隔离的发送/返回操作之后立即恢复连续的发送/返回(使用 set())。
  4. 当我到达主脚本的末尾时停止连续发送/返回(这里我没有解决方案,但应该是 x.stop() 或类似的东西,因为 terminate() 不会做)。
  5. 从停止的进程中获取一些返回值(使用 get())。

我已经尝试了几件事,但我就是无法通过主命令退出 while 循环。

   import multiprocessing
import time

def setup(event):
global unpaused
unpaused = event

class func:
def __init__(self):
self.finished = False

def stop(self):
self.finished = True

def myFunction(self, arg):
i = 0
s=[]
while self.finished == False:
unpaused.wait()
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
return s

if __name__ == "__main__":
x=func()
event = multiprocessing.Event() # initially unset, so workers will be paused at first
pool = multiprocessing.Pool(1, setup, (event,))
result = pool.apply_async(x.myFunction, (10,))
print('We unpause for 2 sec')
event.set() # unpause
time.sleep(2)
print('We pause for 2 sec')
event.clear() # pause
time.sleep(2)
print('We unpause for 2 sec')
event.set() # unpause
time.sleep(2)
print('Now we try to terminate in 2 sec')
time.sleep(2)
x.stop()
return_val = result.get()
print('get worked with '+str(return_val))

有人能指出我正确的方向吗?正如所见,这不会因 x.stop() 而停止。全局值也不起作用。

提前致谢。

编辑:

按照建议,我尝试将多处理放在一个单独的对象中。这是通过将函数放在类中来完成的,就像我下面的示例一样吗?

import multiprocessing
import time

class func(object):
def __init__(self):
self.event = multiprocessing.Event() # initially unset, so workers will be paused at first
self.pool = multiprocessing.Pool(1, self.setup, (self.event,))

def setup(self):
global unpaused
unpaused = self.event

def stop(self):
self.finished = True

def resume(self):
self.event.set() # unpause

def hold(self):
self.event.clear() #pause

def run(self, arg):
self.pool.apply_async(self.myFunction, (arg,))

def myFunction(self, arg):
i = 0
s=[]
self.finished = False
while self.finished == False:
unpaused.wait()
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
return s

if __name__ == "__main__":
x=func()
result = x.run(10)
print('We unpause for 2 sec')
x.resume() # unpause
time.sleep(2)
print('We pause for 2 sec')
x.hold() # pause
time.sleep(2)
print('We unpause for 2 sec')
x.resume() # unpause
time.sleep(2)
print('Now we try to terminate in 2 sec')
time.sleep(2)
x.stop()
return_val = result.get()
print('get worked with '+str(return_val))

我添加了一个 hold 和 resume 函数,并将设置函数放在一个类中。但是下面的例子甚至不会再运行这个函数了。多么复杂的小问题。我对此感到困惑。

编辑2:我尝试了一种解决方法,解决了目前发现的问题。使用 microprocessing.pool 库时遇到了大麻烦。通过 USB 连接使用它并不简单......我在下面制作了一个平庸的解决方法:

from multiprocessing.pool import ThreadPool
import time

class switch:
state = 1
s1 = switch()

def myFunction(arg):
i = 0

while s1.state == 1 or s1.state == 2 or s1.state == 3:
if s1.state == 1:
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
elif s1.state == 2:
print('we entered snippet mode (state 2)')
time.sleep(1)
x = s
return x
pool.close()
pool.join()
elif s1.state == 3:
while s1.state == 3:
time.sleep(1)
print('holding (state 3)')
return s


if __name__ == "__main__":
global s
s=[]

print('we set the state in the class on top to ' +str(s1.state))
pool = ThreadPool(processes=1)
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we switch mode sir, buckle up')
time.sleep(5)
s1.state = 2
print('we switched for a snippet which is')
snippet = async_result.get()
print(str(snippet[-1])+' this snippet comes from main')
time.sleep(1)
print('now we return to see the full list in the end')
s1.state = 1
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we hold it')
time.sleep(5)
s1.state = 3
print('in 5 sec we exit')
time.sleep(5)
s1.state = 0
return_val = async_result.get()
print('Succsses if you see a list of numbers '+ str(return_val))

编辑 3:

from multiprocessing.pool import ThreadPool
import time

class switch:
state = 1
s1 = switch()

def myFunction(arg):
i = 0

while s1.state == 1 or s1.state == 2:
if s1.state == 1:
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
elif s1.state == 2:
print('we entered snippet mode (state 2)')
time.sleep(1)
x = s
return x
pool.close() #These are not relevant i guess.
pool.join() #These are not relevant i guess.

return s


if __name__ == "__main__":
global s
s=[]

print('we set the state in the class on top to ' +str(s1.state))
pool = ThreadPool(processes=1)
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we switch mode sir, buckle up')
time.sleep(5)
s1.state = 2
snippet = async_result.get()
print(str(snippet[-1])+' this snippet comes from main')
time.sleep(1)
print('now we return to see the full list in the end')
s1.state = 1
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we exit')
time.sleep(5)
s1.state = 0
return_val = async_result.get()
print('Succsses if you see a list of numbers '+ str(return_val))

嗯,这就是我想出的...不好也不可怕。也许更可怕的一面(:

我讨厌在获取单个数据后必须调用函数 pool.apply_async(myFunction, (10,))。目前只有 ThreadingPool 在我的实际脚本中没有进一步的代码更改的情况下工作!

最佳答案

在我需要一个进程持续运行,同时偶尔做其他事情的情况下,我喜欢使用 asyncio。这是我将如何处理这个问题的粗略草稿

import asyncio


class MyObject:
def __init__(self):
self.mydatastructure = []
self.finished = False
self.loop = None

async def main_loop(self):
while not self.finished:
new_result = self.get_data()
self.mydatastructure.append(new_result)
await asyncio.sleep(0)

async def timed_loop(self):
while not self.finished:
await asyncio.sleep(2)
self.dotimedtask(self.mydatastructure)

async def run(self):
await asyncio.gather(self.main_loop(), self.timed_loop())


asyncio.run(MyObject().run())

一次只会运行一个协程,定时协程每 2 秒调度一次。它总是会从最近的连续执行中获取数据。你也可以做一些事情,比如在一个对象上保持连接打开。根据您的要求(是 2 秒间隔,还是每隔一秒一次,无论需要多长时间),有一些库包可以使调度更加优雅。

关于python - 如何从 python 中的主脚本退出多处理池进程中的 while 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63234624/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com