gpt4 book ai didi

python - 通过回调而不是轮询更新长进程的进度

转载 作者:行者123 更新时间:2023-12-01 06:57:49 25 4
gpt4 key购买 nike

在我的 Python 脚本中,我触发了一个封装到类方法中的长进程 (drive()):

汽车.py

import time


class Car(object):

def __init__(self, sleep_time_in_seconds, miles_to_drive):
self.sleep_time_in_seconds = sleep_time_in_seconds
self.miles_to_drive = miles_to_drive

def drive(self):

for mile in range(self.miles_to_drive):
print('driving mile #{}'.format(mile))
time.sleep(self.sleep_time_in_seconds)

应用程序.py

from car import Car

sleep_time = 2
total_miles = 5

car = Car(sleep_time_in_seconds=sleep_time, miles_to_drive=total_miles)
car.drive()


def print_driven_distance_in_percent(driven_miles):
print("Driven distance: {}%".format(100 * driven_miles / total_miles))

在主脚本app.py中,我想知道drive()过程的进度。解决此问题的一种方法是创建一个循环,从 Car 类轮询当前进度。如果 Car 类继承自 Thread - 轮询似乎是我在谷歌上搜索到的预期模式......我只是好奇是否有可能以某种方式从 Car 类中通知主脚本当前进度。我考虑过创建一个包装类,我可以将其作为参数传递给 Car 类,然后汽车实例可以调用包装类的 print_progress 函数。或者是否有一种更Pythonic的方式来按需通知调用者脚本?

谢谢

编辑:

基于 Artiom Kozyrev 的回答 - 这就是我想要实现的目标:

import time
from threading import Thread
from queue import Queue


def ask_queue(q):
"""
The function to control status of our status display thread
q - Queue - need to show status of task
"""
while True:
x = q.get() # take element from Queue
if x == "STOP":
break
print("Process completed in {} percents".format(x))
print("100% finished")


class MyClass:
"""My example class"""
def __init__(self, name, status_queue):
self.name = name
self.status_queue = status_queue

def my_run(self):
"""
The function we would like to monitor
"""
# th = Thread(target=MyClass.ask_queue, args=(self.status_queue,), ) # monitoring thread
# th.start() # start monitoring thread
for i in range(100): # start doing our main function we would like to monitor
print("{} {}".format(self.name, i))
if i % 5 == 0: # every 5 steps show status of progress
self.status_queue.put(i) # send status to Queue
time.sleep(0.1)
self.status_queue.put("STOP") # stop Queue
# th.join()


if __name__ == "__main__":

q = Queue()

th = Thread(target=ask_queue, args=(q,), ) # monitoring thread
th.start() # start monitoring thread

# tests
x = MyClass("Maria", q)
x.my_run()

th.join()

谢谢大家!!

最佳答案

感谢您提出有趣的问题,通常您不需要使用状态作为案例的单独线程,您可以只在您想要监视的方法中打印状态,但出于培训目的,您可以通过以下方式解决问题,请关注评论并随时提问:

import time
from threading import Thread
from queue import Queue


class MyClass:
"""My example class"""
def __init__(self, name, status_queue):
self.name = name
self.status_queue = status_queue

@staticmethod
def ask_queue(q):
"""
The function to control status of our status display thread
q - Queue - need to show status of task
"""
while True:
x = q.get() # take element from Queue
if x == "STOP":
break
print("Process completed in {} percents".format(x))
print("100% finished")

def my_run(self):
"""
The function we would like to monitor
"""
th = Thread(target=MyClass.ask_queue, args=(self.status_queue,), ) # monitoring thread
th.start() # start monitoring thread
for i in range(100): # start doing our main function we would like to monitor
print("{} {}".format(self.name, i))
if i % 5 == 0: # every 5 steps show status of progress
self.status_queue.put(i) # send status to Queue
time.sleep(0.1)
self.status_queue.put("STOP") # stop Queue
th.join()


if __name__ == "__main__":
# tests
x = MyClass("Maria", Queue())
x.my_run()
print("*" * 200)
x.my_run()

关于python - 通过回调而不是轮询更新长进程的进度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58730987/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com