- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我的场景是这样的:
我当前的解决方案:让所有工作人员通过另一个队列回答哪些任务已完成,并引入必须完成任务的截止日期。如果达到截止日期,则重置任务并重新排队。这存在解决方案“软”的问题,即截止日期是任意的。
我正在寻找最简单的解决方案。是否有更简单或更严格的解决方案?
最佳答案
此解决方案使用三个队列来跟踪工作(模拟为 WORK_ID
):
todo_q
:任何要完成的工作(包括如果进程在运行中死亡则要重做的工作)start_q
:任何已经被进程启动的工作finish_q
:任何已经完成的工作使用这种方法你不需要定时器。只要您分配一个进程标识符并跟踪分配,检查是否 Process.is_alive()
。如果进程终止,则将该工作添加回待办事项队列。
在下面的代码中,我模拟了一个工作进程在 25% 的时间内死亡...
from multiprocessing import Process, Queue
from Queue import Empty
from random import choice as rndchoice
import time
def worker(id, todo_q, start_q, finish_q):
"""multiprocessing worker"""
msg = None
while (msg!='DONE'):
try:
msg = todo_q.get_nowait() # Poll non-blocking on todo_q
if (msg!='DONE'):
start_q.put((id, msg)) # Let the controller know work started
time.sleep(0.05)
if (rndchoice(range(3))==1):
# Die a fraction of the time before finishing
print "DEATH to worker %s who had task=%s" % (id, msg)
break
finish_q.put((id, msg)) # Acknowledge work finished
except Empty:
pass
return
if __name__ == '__main__':
NUM_WORKERS = 5
WORK_ID = set(['A','B','C','D','E']) # Work to be done, you will need to
# name work items so they are unique
WORK_DONE = set([]) # Work that has been done
ASSIGNMENTS = dict() # Who was assigned a task
workers = dict()
todo_q = Queue()
start_q = Queue()
finish_q = Queue()
print "Starting %s tasks" % len(WORK_ID)
# Add work
for work in WORK_ID:
todo_q.put(work)
# spawn workers
for ii in xrange(NUM_WORKERS):
p = Process(target=worker, args=(ii, todo_q, start_q, finish_q))
workers[ii] = p
p.start()
finished = False
while True:
try:
start_ack = start_q.get_nowait() # Poll for work started
## Check for race condition between start_ack and finished_ack
if not ASSIGNMENTS.get(start_ack[0], False):
ASSIGNMENTS[start_ack[0]] = start_ack # Track the assignment
print "ASSIGNED worker=%s task=%s" % (start_ack[0],
start_ack[1])
WORK_ID.remove(start_ack[1]) # Account for started tasks
else:
# Race condition. Never overwrite existing assignments
# Wait until the ASSIGNMENT is cleared
start_q.put(start_ack)
except Empty:
pass
try:
finished_ack = finish_q.get_nowait() # Poll for work finished
# Check for race condition between start_ack and finished_ack
if (ASSIGNMENTS[finished_ack[0]][1]==finished_ack[1]):
# Clean up after the finished task
print "REMOVED worker=%s task=%s" % (finished_ack[0],
finished_ack[1])
del ASSIGNMENTS[finished_ack[0]]
WORK_DONE.add(finished_ack[1])
else:
# Race condition. Never overwrite existing assignments
# It was received out of order... wait for the 'start_ack'
finish_q.put(finished_ack)
finished_ack = None
except Empty:
pass
# Look for any dead workers, and put their work back on the todo_q
if not finished:
for id, p in workers.items():
status = p.is_alive()
if not status:
print " WORKER %s FAILED!" % id
# Add to the work again...
todo_q.put(ASSIGNMENTS[id][1])
WORK_ID.add(ASSIGNMENTS[id][1])
del ASSIGNMENTS[id] # Worker is dead now
del workers[id]
ii += 1
print "Spawning worker number", ii
# Respawn a worker to replace the one that died
p = Process(target=worker, args=(ii, todo_q, start_q,
finish_q))
workers[ii] = p
p.start()
else:
for id, p in workers.items():
p.join()
del workers[id]
break
if (WORK_ID==set([])) and (ASSIGNMENTS.keys()==list()):
finished = True
[todo_q.put('DONE') for x in xrange(NUM_WORKERS)]
else:
pass
print "We finished %s tasks" % len(WORK_DONE)
在我的笔记本电脑上运行这个...
mpenning@mpenning-T61:~$ python queueack.py
Starting 5 tasks
ASSIGNED worker=2 task=C
ASSIGNED worker=0 task=A
ASSIGNED worker=4 task=B
ASSIGNED worker=3 task=E
ASSIGNED worker=1 task=D
DEATH to worker 4 who had task=B
DEATH to worker 3 who had task=E
WORKER 3 FAILED!
Spawning worker number 5
WORKER 4 FAILED!
Spawning worker number 6
REMOVED worker=2 task=C
REMOVED worker=0 task=A
REMOVED worker=1 task=D
ASSIGNED worker=0 task=B
ASSIGNED worker=2 task=E
REMOVED worker=2 task=E
DEATH to worker 0 who had task=B
WORKER 0 FAILED!
Spawning worker number 7
ASSIGNED worker=5 task=B
REMOVED worker=5 task=B
We finished 5 tasks
mpenning@mpenning-T61:~$
我以 25% 的死亡率对 10000 多个工作项进行了测试。
关于python - 恢复丢失的 multiprocessing.Queue 项目,当工作进程死亡时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8532215/
有什么方法可以恢复删除的元素吗? 这是我删除元素的代码 myFunction() { var width = window.innerWidth; var February = doc
我有一个 TokuDB 表,由于某种原因缺少 ***_status.tokudb 文件。 我还不确定文件是否由于 TokuDB 崩溃而丢失。 问题是: 有没有办法从主要文件和关键文件(我可以从 tok
我正在 Windows 7 (x86) 上运行带有 Workbench 6.3.8 的 32 位 MySQL Server 5.7.22 本地实例(必须选择 32 位版本 - 所以,较旧的版本)。 我
1、备份 <% SQL="backup database 数据库名 to disk='"&Serve
1、ASP中怎么实现SQL数据库备份、恢复! 答:asp在线备份sql server数据库: 1、备份 <% SQL="ba
我在 R 中使用 stats::filter 函数来理解 R 中的 ARIMA 模拟(如在函数 stats::arima.sim 中)和估计。我知道 stats::filter 将线性过滤器应用于向量
我已经浏览了示例应用程序的文档和代码,并发现 files/objectbox/objectbox/data.mdb 是存储所有数据的默认文件。 假设我的理解是正确的,我有几个问题找不到文档: 我想在我
为了恢复非续订订阅类型的 InAppPurchase,我已经实现了服务器来处理此问题。 但在购买过程中,iTunes 有时不会要求用户验证他们的卡详细信息, 在这种情况下,它会在后台发送应用程序并显示
我的问题是寻找cocos2d游戏期间暂停/恢复状态(包括所有需要保存的数据信息)的设计解决方案。 包括但不限于以下情况: 1).用户选择退出,然后弹出一个对话框供用户选择“直接退出”、“暂停”; 2)
在 Mercurial 中,我有一个旧的变更集,除了对单个文件的更改外,它都很好。我将如何恢复对该单个文件的更改? 即使只是能够在上一个变更集中查看文件的状态也会很好,然后我可以剪切和粘贴。 我的 M
我的一项职能遇到了困难。我想做的是计时器在页面加载后立即启动,并且只有一个带有启动/恢复的按钮。我无法在代码中找出需要更改功能的位置。有人可以帮助我吗?谢谢! HTML: , Javascr
我正在阅读Scrap your type classes 。这为类型类提供了替代方案。然而,我被Paul Chiusano的评论所困扰。其中讨论了恢复 do 表示法 语法。 坦白说,我无法理解 ret
当 OrientDB 因某人重新启动机器而非正常关闭时,OrientDB 最终会处于数据恢复失败的状态。对于如何从这种不正常的关闭中正常恢复有什么建议吗?我们正在寻找系统在断电期间能够自行恢复的方法。
我正在构建一个 Electron 应用程序,如果发生崩溃,它必须重新加载渲染进程窗口。 目前我可以从主进程重新启动应用程序 app.relaunch(); app.quit(); 但我无法检测到窗口崩
我有 3 个 Activity ,比如说 MainActivity、 Activity 2 和 Activity 3。 在 MainActivity 中,我有一个按钮(开始/停止),当我单击此按钮时,
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 11 年前。 Improve thi
Twilio 是否支持暂停和恢复内容播放。换句话说,我有相当长的文件将播放给调用者,并且我正在尝试找到一种方法来实现暂停和恢复功能。在播放某些内容的过程中,我希望用户能够按数字暂停,然后再次按数字从音
我已经提交了 A、B、C、D 和 E。我意识到在提交 B 中发生了一些非常糟糕的事情,所以我想回到 A,这次正确地进行之前搞砸了 B 的更改,然后重新应用 C 、 D 和 E 自动。 您可能想知道为什
我的一个文件被“标记为文本”,图标也发生了变化。实际上这是一个 PHP 文件。我尝试过使用 Help -> Find Action -> Mark As 尝试将其恢复为 PHP 突出显示,但它不起作用
我有一些 SSE 程序,可以将循环中的内存归零,当指针未对齐时,它会引发 SIGSEGV进入我的处理程序。我可以在此类处理程序中获取更多信息吗例行公事,现在我不知道它是在哪里完成的,我也可以吗以某种可
我是一名优秀的程序员,十分优秀!