gpt4 book ai didi

Python threading.join() 挂起

转载 作者:行者123 更新时间:2023-12-03 12:47:52 24 4
gpt4 key购买 nike

我的问题如下:我有一个继承自 threading.Thread 的类,我希望它能够正常停止。这个类还有一个队列,它从中获取它的工作。

由于我的项目中有相当多的类应该具有这种行为,所以我创建了一些父类(super class)来减少重复代码,如下所示:

线程相关行为:

class StoppableThread(Thread):
def __init__(self):
Thread.__init__(self)
self._stop = Event()

def stop(self):
self._stop.set()

def stopped(self):
return self._stop.isSet()

队列相关行为:

class Queueable():
def __init__(self):
self._queue = Queue()

def append_to_job_queue(self, job):
self._queue.put(job)

结合以上两者并将 queue.join() 添加到 stop() 调用

class StoppableQueueThread(StoppableThread, Queueable):
def __init__(self):
StoppableThread.__init__(self)
Queueable.__init__(self)

def stop(self):
super(StoppableQueueThread, self).stop()
self._queue.join()

数据源的基类:

class DataSource(StoppableThread, ABC):

def __init__(self, data_parser):
StoppableThread.__init__(self)
self.setName("DataSource")
ABC.__init__(self)
self._data_parser = data_parser

def run(self):
while not self.stopped():
record = self._fetch_data()
self._data_parser.append_to_job_queue(record)

@abstractmethod
def _fetch_data(self):
"""implement logic here for obtaining a data piece
should return the fetched data"""

数据源的实现:

class CSVDataSource(DataSource):
def __init__(self, data_parser, file_path):
DataSource.__init__(self, data_parser)
self.file_path = file_path
self.csv_data = Queue()
print('loading csv')
self.load_csv()
print('done loading csv')

def load_csv(self):
"""Loops through csv and adds data to a queue"""
with open(self.file_path, 'r') as f:

self.reader = reader(f)
next(self.reader, None) # skip header
for row in self.reader:
self.csv_data.put(row)

def _fetch_data(self):
"""Returns next item of the queue"""
item = self.csv_data.get()
self.csv_data.task_done()
print(self.csv_data.qsize())
return item

假设有一个名为 dsCSVDataSource 实例,如果我想停止我调用的线程:

ds.stop()
ds.join()

但是,ds.join() 调用永远不会返回。我不确定这是为什么,因为 run() 方法会检查是否设置了停止事件。

有什么想法吗?

更新

按要求更清楚一点:应用程序是由多个线程构建的。 RealStrategy 线程(如下)是所有其他线程的所有者,负责启动和终止它们。我没有为任何线程设置守护进程标志,因此默认情况下它们应该是非守护进程。

主线程是这样的:

if __name__ == '__main__':
def exit_handler(signal, frame):
rs.stop_engine()
rs.join()
sys.exit(0)

signal.signal(signal.SIGINT, exit_handler)



rs = RealStrategy()
rs.run_engine()

下面是在 ma​​in 中调用的 rs.run_engine()rs.stop_engine() 方法:

class RealStrategy(Thread):
.....
.....
def run_engine(self):
self.on_start()
self._order_handler.start()
self._data_parser.start()
self._data_source.start()
self.start()

def stop_engine(self):
self._data_source.stop()
self._data_parser.stop()
self._order_handler.stop()

self._data_source.join()
self._data_parser.join()
self._order_handler.join()

self.stop()

最佳答案

如果你想使用queue.Queue.join , 那么你还必须使用 queue.Queue.task_done .您可以阅读链接的文档或查看从在线可用信息中复制的以下内容:

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.


Queue.join()

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.


为了测试您的问题,我们创建了一个示例实现来了解发生了什么。它与您的程序的工作方式略有不同,但展示了解决问题的方法:

#! /usr/bin/env python3
import abc
import csv
import pathlib
import queue
import sys
import threading
import time


def main():
source_path = pathlib.Path(r'C:\path\to\file.csv')
data_source = CSVDataSource(source_path)
data_source.start()
processor = StoppableThread(target=consumer, args=[data_source])
processor.start()
time.sleep(0.1)
data_source.stop()


def consumer(data_source):
while data_source.empty:
time.sleep(0.001)
while not data_source.empty:
task = data_source.get_from_queue(True, 0.1)
print(*task.data, sep=', ', flush=True)
task.done()


class StopThread(StopIteration):
pass


threading.SystemExit = SystemExit, StopThread


class StoppableThread(threading.Thread):
def _bootstrap(self, stop=False):
# noinspection PyProtectedMember
if threading._trace_hook:
raise RuntimeError('cannot run thread with tracing')

def terminate():
nonlocal stop
stop = True

self.__terminate = terminate

# noinspection PyUnusedLocal
def trace(frame, event, arg):
if stop:
raise StopThread

sys.settrace(trace)
super()._bootstrap()

def terminate(self):
try:
self.__terminate()
except AttributeError:
raise RuntimeError('cannot terminate thread '
'before it is started') from None


class Queryable:
def __init__(self, maxsize=1 << 10):
self.__queue = queue.Queue(maxsize)

def add_to_queue(self, item):
self.__queue.put(item)

def get_from_queue(self, block=True, timeout=None):
return self.__queue.get(block, timeout)

@property
def empty(self):
return self.__queue.empty()

@property
def full(self):
return self.__queue.full()

def task_done(self):
self.__queue.task_done()

def join_queue(self):
self.__queue.join()


class StoppableQueryThread(StoppableThread, Queryable):
def __init__(self, target=None, name=None, args=(), kwargs=None,
*, daemon=None, maxsize=1 << 10):
super().__init__(None, target, name, args, kwargs, daemon=daemon)
Queryable.__init__(self, maxsize)

def stop(self):
self.terminate()
self.join_queue()


class DataSource(StoppableQueryThread, abc.ABC):
@abc.abstractmethod
def __init__(self, maxsize=1 << 10):
super().__init__(None, 'DataSource', maxsize=maxsize)

def run(self):
while True:
record = self._fetch_data()
self.add_to_queue(record)

@abc.abstractmethod
def _fetch_data(self):
pass


class CSVDataSource(DataSource):
def __init__(self, source_path):
super().__init__()
self.__data_parser = self.__build_data_parser(source_path)

@staticmethod
def __build_data_parser(source_path):
with source_path.open(newline='') as source:
parser = csv.reader(source)
next(parser, None)
yield from parser

def _fetch_data(self):
try:
return Task(next(self.__data_parser), self.task_done)
except StopIteration:
raise StopThread from None


class Task:
def __init__(self, data, callback):
self.__data = data
self.__callback = callback

@property
def data(self):
return self.__data

def done(self):
self.__callback()


if __name__ == '__main__':
main()

关于Python threading.join() 挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47653048/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com