gpt4 book ai didi

python - RxPY 中带有 from_iterable/range 的 subscribe_on

转载 作者:太空宇宙 更新时间:2023-11-03 11:26:27 27 4
gpt4 key购买 nike

我正在尝试着手安排 python 的响应式扩展。我想使用 subscribe_on 并行处理多个可观察对象。如果使用 just 创建 observable,这会很好地工作,但如果使用 rangefrom_ 则不行。

just 默认为 Scheduler.immediate,而其他生成器默认为 Scheduler.current_thread。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)


def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)

它与 observe_on 一起工作,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现这样的事情:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)


def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
return rx.Observable.just(1).do_action(work)


def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)


def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

我是否误解了 subscribe_on?我的方法错了吗?

最佳答案

您的示例中有三个可以独立安排的操作:

  1. 数据馈送操作。 justrange 默认使用不同的调度器,但它们之间没有太大区别。两者都在当前线程上输入初始值。您可以通过将其作为参数传递给这些方法来覆盖它们的默认调度程序。

  2. 订阅操作。默认使用 Scheduler.current_thread。 IE。它与数据馈送操作在同一线程上执行。可以被 subscribe_on 方法覆盖。

  3. 观察(on_nexton_erroron_completed) Action 。默认使用 Scheduler.current_thread。 IE。它与订阅操作在同一线程上执行。可以被 observe_on 方法覆盖。

如果您仅为这些操作中的一个覆盖调度程序,则其他操作应如上所述进行。

关于调度器

Scheduler.immediate 并不真正安排任何事情。它立即在调度它的同一线程上调用操作。

Scheduler.current_thread 通过排队操作避免递归,但仍会在调度它的同一线程上调用操作。

Scheduler.new_thread 启动单个后台线程来一个接一个地执行操作。

Scheduler.timeout 为它需要执行的每个操作启动新的后台线程。

尝试并行处理

在不同线程中安排工作最合适的方法似乎是observe_on

问题是目前 RxPy 中没有 thread_pool 调度器。 new_thread 调度程序只启动一个线程,因此对您帮助不大。

timeout 调度程序可用于并行运行,但它无法控制并发线程的数量,因此并发任务数量的爆炸性增长可能会溢出内存并导致系统崩溃。

不是 observe_on 中的错误

我尝试使用 observe_on(Scheduler.timeout) 运行您的示例,但任务仍然没有并行进行。在查看 RxPy 源代码后,我发现它仅在当前事件完成后才安排下一个事件,这有效地禁用了并行处理。我的第一 react 是报告bugobserve_on 实现中。

但经过进一步调查,我发现串行执行不是错误,而是 intended behavior .

并行执行任务的正确方法

这是有效的代码(基于 this answer ):

Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(finish)

Observable.start 创建异步 observable,它通过 Scheduler.timeout 在单独的线程上调度。

observe_on(Scheduler.event_loop) 是可选的。它强制在同一线程上调用所有项目的 finish 方法。

请注意,不能保证 finish 方法在初始 range 顺序中被调用。

关于python - RxPY 中带有 from_iterable/range 的 subscribe_on,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32450932/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com