gpt4 book ai didi

Python 脚本使用 while 循环来不断更新作业脚本并多处理队列中的任务

转载 作者:IT老高 更新时间:2023-10-28 20:55:50 33 4
gpt4 key购买 nike

我正在尝试编写一个扫描文件夹并收集更新的 SQL 脚本的 python 脚本,然后自动为 SQL 脚本提取数据。在代码中,一个while循环正在扫描新的SQL文件,并发送到数据拉取函数。我无法理解如何使用 while 循环创建动态队列,但也有多进程来运行队列中的任务。

以下代码存在一个问题,即 while 循环迭代在移动到下一次迭代之前会处理一个很长的作业,并收集其他作业来填充空闲的处理器。

更新:

  1. 感谢@pbacterio 发现了这个错误,现在错误消息已经消失了。更改代码后,python 代码可以在一次迭代中获取所有作业脚本,并将脚本分发到四个处理器。但是,要进行下一次迭代,扫描并提交新添加的作业脚本,它将被一个很长的作业挂起。知道如何重构代码吗?

  2. 我终于找到了解决方案,请参阅下面的答案。原来我要找的是

    the_queue = 队列()
    the_pool = Pool(4, worker_main,(the_queue,))

  3. 对于那些偶然发现类似想法的人,以下是这个自动化脚本的整个架构,它将共享驱动器转换为“用于 SQL 拉取的服务器”或任何其他作业队列“服务器”。

    一个。 python 脚本 auto_data_pull.py 如答案所示。您需要添加自己的工作职能。

    b.具有以下内容的“批处理脚本”:

    启动 C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.py

    c。添加由启动计算机触发的任务,运行“批处理脚本”就这样。有用。

Python 代码:

from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = compute(func, args)
output.put(result)

#
# Function used to compute result
#

def compute(func, args):
result = func(args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)


def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)


def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")

files_dict = {}
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
return files_dict


##### working in single thread
def single_thread():
path = "Y:/"

before = check_files(path)
sql_queue = []

while True:
time.sleep(3)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]

before = after

sql_queue = added + updated
# print sql_queue
for sql_file in sql_queue:
try:
query_sql(sql_file)
except:
pass


##### not working in queue
def multiple_thread():

NUMBER_OF_PROCESSES = 4
path = "Y:/"

sql_queue = []
before = check_files(path) # get the current dictionary of sql_files
task_queue = Queue()
done_queue = Queue()

while True: #while loop to check the changes of the files
time.sleep(5)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]

before = after
sql_queue = added + updated

TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
# Create queues

#submit task
for task in TASKS:
task_queue.put(task)

for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue)).start()
# try:
# p = Process(target=worker, args=(task_queue))
# p.start()

# except:
# pass

# Get and print results
print 'Unordered results:'
for i in range(len(TASKS)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')

# single_thread()
if __name__ == '__main__':
# freeze_support()
multiple_thread()

引用:

  1. 使用 python 脚本监控文件更改:http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
  2. 多处理:
    https://docs.python.org/2/library/multiprocessing.html

最佳答案

multiple_thread()中的sql_file在哪里定义

multiprocessing.Process(target=query_sql, args=(sql_file)).start()

您还没有在方法中定义 sql_file,而且您在 for 循环中使用了该变量。变量的作用域仅限于 for 循环。

关于Python 脚本使用 while 循环来不断更新作业脚本并多处理队列中的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46223523/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com