gpt4 book ai didi

python - 如何等待 RxPy 并行线程完成

转载 作者:太空狗 更新时间:2023-10-30 02:54:55 25 4
gpt4 key购买 nike

基于此excellent SO answer我可以让多个任务在 RxPy 中并行工作,我的问题是你如何等待它们全部完成?我知道使用线程我可以执行 .join() 但 Rx Schedulers 似乎没有任何此类选项。 .to_blocking() 也无济于事,MainThread 在所有通知都已触发且完整的处理程序已被调用之前完成。这是一个例子:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread

def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value

if __name__ == "__main__":
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))

printthread("\nAll done")
# time.sleep(2)

预期输出

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread

实际输出

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread

如果我取消注释 sleep 调用的实际输出

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4

最佳答案

在这里发布完整的解决方案:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
from rx.concurrency import ThreadPoolScheduler

def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value

if __name__ == "__main__":
scheduler = ThreadPoolScheduler(4)

Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))

printthread("\nAll done")
scheduler.executor.shutdown()
# time.sleep(2)

关于python - 如何等待 RxPy 并行线程完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43989153/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com