- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我正在尝试编写一个扫描文件夹并收集更新的 SQL 脚本的 python 脚本,然后自动为 SQL 脚本提取数据。在代码中,一个while循环正在扫描新的SQL文件,并发送到数据拉取函数。我无法理解如何使用 while 循环创建动态队列,但也有多进程来运行队列中的任务。
以下代码存在一个问题,即 while 循环迭代在移动到下一次迭代之前会处理一个很长的作业,并收集其他作业来填充空闲的处理器。
更新:
感谢@pbacterio 发现了这个错误,现在错误消息已经消失了。更改代码后,python 代码可以在一次迭代中获取所有作业脚本,并将脚本分发到四个处理器。但是,要进行下一次迭代,扫描并提交新添加的作业脚本,它将被一个很长的作业挂起。知道如何重构代码吗?
我终于找到了解决方案,请参阅下面的答案。原来我要找的是
the_queue = 队列()
the_pool = Pool(4, worker_main,(the_queue,))
对于那些偶然发现类似想法的人,以下是这个自动化脚本的整个架构,它将共享驱动器转换为“用于 SQL 拉取的服务器”或任何其他作业队列“服务器”。
一个。 python 脚本 auto_data_pull.py
如答案所示。您需要添加自己的工作职能。
b.具有以下内容的“批处理脚本”:
启动 C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.py
c。添加由启动计算机触发的任务,运行“批处理脚本”就这样。有用。
Python 代码:
from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = compute(func, args)
output.put(result)
#
# Function used to compute result
#
def compute(func, args):
result = func(args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
return files_dict
##### working in single thread
def single_thread():
path = "Y:/"
before = check_files(path)
sql_queue = []
while True:
time.sleep(3)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
# print sql_queue
for sql_file in sql_queue:
try:
query_sql(sql_file)
except:
pass
##### not working in queue
def multiple_thread():
NUMBER_OF_PROCESSES = 4
path = "Y:/"
sql_queue = []
before = check_files(path) # get the current dictionary of sql_files
task_queue = Queue()
done_queue = Queue()
while True: #while loop to check the changes of the files
time.sleep(5)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
# Create queues
#submit task
for task in TASKS:
task_queue.put(task)
for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue)).start()
# try:
# p = Process(target=worker, args=(task_queue))
# p.start()
# except:
# pass
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# single_thread()
if __name__ == '__main__':
# freeze_support()
multiple_thread()
引用:
最佳答案
multiple_thread()
中的sql_file
在哪里定义
multiprocessing.Process(target=query_sql, args=(sql_file)).start()
您还没有在方法中定义 sql_file
,而且您在 for 循环中使用了该变量。变量的作用域仅限于 for 循环。
关于Python 脚本使用 while 循环来不断更新作业脚本并多处理队列中的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46223523/
如果这不是一个错误,那就是另一个错误。如果不是那样的话,那就是别的东西了。我觉得我的项目已经改变了很多,现在只是试图解决代码签名问题,结果一切都搞砸了。我严格按照说明进行操作,但出现错误,例如当前的“
我不确定是否有一些我不知道的内置变量或规则,或者 make 是否有问题,或者我只是疯了。 对于我的一个项目,我有一个如下的 makefile: CC=g++ CFLAGS=-O3 `libpng-co
我有大约 10 个 div,它们必须不断翻转,每个 div 延迟 3 秒 这个 codrops 链接的最后一个效果是我正在寻找的,但无需单击 div http://tympanus.net/Devel
我如何使用 jQuery 持续运行 PHP 脚本并每秒获取响应,以及将鼠标上的少量数据发送到同一脚本? 我真的必须添加一些随机扩展才能让这么简单的计时器工作吗? 最佳答案 To iterate is
JBoss 4.x EJB 3.0 我见过如下代码(大大简化): @Stateless @TransactionAttribute(TransactionAttributeType.NOT_SUPPO
使用 PHPStorm,我试图忽略每次尝试进行 git 提交时 pop 的 workspace.xml。 我的 .gitignore 看起来像: /.idea/ .idea/workspace.xml
我是一名优秀的程序员,十分优秀!