gpt4 book ai didi

python - 由于 eventlet Monkey_patch,APScheduler 任务未触发

转载 作者:行者123 更新时间:2023-12-01 03:27:02 25 4
gpt4 key购买 nike

我有一些 python 代码,其中 APScheduler 作业未触发。作为上下文,我还有一个处理程序,除了使用 eventlet/GreenPool 进行多线程处理之外,它还在查找文件修改的目录。根据一些故障排除,APScheduler 和 eventlet 之间似乎存在某种冲突。

我的输出如下:

2016-12-26 02:30:30 UTC (+0000): Finished Download Pass
2016-12-26 02:46:07 UTC (+0000): EXITING due to control-C or other exit signal
Jobstore default:
Time-Activated Download (trigger: interval[0:05:00], next run at: 2016-12-25 18:35:00 PST) 2016-12-26 02:46:07 UTC (+0000): 1

(18:35 PST = 02:35 UTC)...所以它应该在我按下 control-C 之前 11 分钟触发

from apscheduler import events ## pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler

# Threading
from eventlet import patcher, GreenPool ## pip install eventlet
patcher.monkey_patch(all = True)

def setSchedule(scheduler, cfg, minutes = 60*2, hours = 0):
"""Set up the schedule of how frequently a download should be attempted.
scheduler object must already be declared.
will accept either minutes or hours for the period between downloads"""
if hours > 0:
minutes = 60*hours if minutes == 60 else 60*hours+minutes
handle = scheduler.add_job(processAllQueues,
trigger='interval',
kwargs={'cfg': cfg},
id='RQmain',
name='Time-Activated Download',
coalesce=True,
max_instances=1,
minutes=minutes,
start_date=dt.datetime.strptime('2016-10-10 00:15:00', '%Y-%m-%d %H:%M:%S') # computer's local time
)
return handle

def processAllQueues(cfg):
SQSpool = GreenPool(size=int(cfg.get('GLOBAL','Max_AWS_Connections')))
FHpool = GreenPool(size=int(cfg.get('GLOBAL','Max_Raw_File_Process')))
arSects = []
dGlobal = dict(cfg.items('GLOBAL'))
for sect in filter(lambda x: iz.notEqualz(x,'GLOBAL','RUNTIME'),cfg.sections()):
dSect = dict(cfg.items(sect)) # changes all key names to lowercase
n = dSect['sqs_queue_name']
nn = dSect['node_name']
fnbase = "{}_{}".format(nn,n)
dSect["no_ext_file_name"] = os.path.normpath(os.path.join(cfg.get('RUNTIME','Data_Directory'),fnbase))
arSects.append(mergeTwoDicts(dGlobal,dSect)) # section overrides global
arRes = []
for (que_data,spec_section) in SQSpool.imap(doQueueDownload,arSects):
if que_data: fileResult = FHpool.spawn(outputQueueToFiles,spec_section,que_data).wait()
else: fileResult = (False,spec_section['sqs_queue_name'])
arRes.append(fileResult)
SQSpool.waitall()
FHpool.waitall()
pr.ts_print("Finished Download Pass")
return None

def main():
cfgglob = readConfigs(cfgdir, datdir)
sched = BackgroundScheduler()
cron_job = setSchedule(sched, cfgglob, 5)
sched.start(paused=True)
try:
change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE)
processAllQueues(cfgglob)
sched.resume() # turn the scheduler back on and monitor both wallclock and config directory.
cron_job.resume()
while 1:
SkipDownload = False
result = win32event.WaitForSingleObject(change_handle, 500)
if result == win32con.WAIT_OBJECT_0: # If the WaitForSO returned because of a notification rather than error/timing out
sched.pause() # make sure we don't run the job as a result of timestamp AND file modification
while 1:
try:
win32file.FindNextChangeNotification(change_handle) # rearm - done at start because of the loop structure here
cfgglob = None
cfgglob = readConfigs(cfgdir,datdir)
cron_job.modify(kwargs={'cfg': cfgglob}) # job_id="RQmain",
change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE) # refresh handle
if not SkipDownload: processAllQueues(cfgglob)
sched.resume()
cron_job.resume()
break
except KeyboardInterrupt:
if VERBOSE | DEBUG: pr.ts_print("EXITING due to control-C or other exit signal")
finally:
sched.print_jobs()
pr.ts_print(sched.state)
sched.shutdown(wait=False)

如果我注释掉大部分 processAllQueues 函数以及顶部包含的 eventlet,它会正确触发。如果我保留

from eventlet import patcher, GreenPool ## pip install eventlet
patcher.monkey_patch(all = True)

但是注释掉 processAllQueues 直到倒数第二行中的打印行,它无法触发 APScheduler,表明导入 patcher 和 GreenPool 或 Monkey_patch 语句存在问题。注释掉 patcher.monkey_patch(all = True) 使其再次“工作”。

有谁知道在我的情况下可以使用的替代 Monkey_patch 语句是什么?

最佳答案

您有一个明确的事件循环来监视文件更改。这会阻止 eventlet 事件循环运行。您有两个选择:

  • 将阻塞调用(例如 win32event.WaitForSingleObject())封装在 eventlet.tpool.execute()
  • 在阻塞调用之前/之后运行 eventlet.sleep() 并确保不会阻塞太长时间。

eventlet.monkey_patch(thread=False) 是将所有其他模块列为 true 的更短替代方案。通常,当使用锁或线程本地存储或线程 API 生成绿色线程时,您需要 thread=True。如果您真正使用操作系统线程(例如有趣的 GUI 框架),您可能需要 thread=False

您不应该真正考虑在 Windows 上使用 Eventlet 来运行重要项目。与 POSIX 相比,性能要差很多。从 0.17 开始我就没有在 Windows 上运行测试。这是为了便于在流行的桌面平台上进行开发。

关于python - 由于 eventlet Monkey_patch,APScheduler 任务未触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41326156/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com