gpt4 book ai didi

python - 创建线程Python队列?

转载 作者:行者123 更新时间:2023-11-30 22:17:50 28 4
gpt4 key购买 nike

如何在 Python 中创建一个队列来在后台运行任务?

我尝试通过 asyncio.Queue() 但每当我使用 Queue.put(task) 时它都会立即启动任务。

它适用于在指定时间间隔从数据库接收未知数量条目(文件名)的应用程序。我希望通过这个背景队列实现的是 python 应用程序保持运行并不断返回新的文件名。每次应用程序找到新文件名时,它都应该通过创建一个包含(方法(变量))的任务来处理它们。这些任务都应该被放入一个不断扩展的队列中,该队列自行运行这些任务。这是代码。

class DatabaseHandler:
def __init__(self):
try:
self.cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='mydb')
self.cnx.autocommit = True
self.q = asyncio.Queue()
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
self.get_new_entries(30.0)

def get_new_entries(self, delay):
start_time = t.time()
while True:
current_time = datetime.datetime.now() - datetime.timedelta(seconds=delay)
current_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
data = current_time
print(current_time)
self.select_latest_entries(data)
print("###################")
t.sleep(delay - ((t.time() - start_time) % delay))

def select_latest_entries(self, input_data):
query = """SELECT FILE_NAME FROM `added_files` WHERE CREATION_TIME > %s"""
cursor = self.cnx.cursor()
cursor.execute(query, (input_data,))
for file_name in cursor.fetchall():
file_name_string = ''.join(file_name)
self.q.put(self.handle_new_file_names(file_name_string))
cursor.close()

def handle_new_file_names(self, filename):
create_new_npy_files(filename)
self.update_entry(filename)

def update_entry(self, filename):
print(filename)
query = """UPDATE `added_files` SET NPY_CREATED_AT=NOW(), DELETED=1 WHERE FILE_NAME=%s"""
update_cursor = self.cnx.cursor()
self.cnx.commit()
update_cursor.execute(query, (filename,))
update_cursor.close()

正如我所说,这将立即运行任务。

create_new_npy_files 是静态类中非常耗时的方法。

最佳答案

这个表达式有两个问题:

self.q.put(self.handle_new_file_names(file_name_string))

首先,它实际上调用handle_new_file_names方法并将其结果排入队列。这并不是 asyncio.Queue 所特有的,它是 Python(以及大多数主流语言)中函数调用的工作方式。上面相当于:

_tmp = self.handle_new_file_names(file_name_string)
self.q.put(_tmp)

第二个问题是 asyncio.Queue 操作(如 getput)是 coroutines ,所以你必须等待他们。

如果您想将可调用对象排入队列,可以使用lambda:

await self.q.put(lambda: self.handle_new_file_names(file_name_string))

但是由于队列的使用者在您的控制之下,因此您可以简单地将文件名排入队列,如 @dirn 所建议的:

await self.q.put(file_name_string)

队列的使用者将使用 await self.q.get() 读取文件名并在每个文件上调用 self.handle_new_file_names()

如果您打算使用 asyncio,请考虑 reading a tutorial涵盖基础知识,并切换到 asyncio compliant数据库连接器,以便数据库查询与异步事件循环一起进行。

关于python - 创建线程Python队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49536612/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com