gpt4 book ai didi

python - RxPY - flat_map 发射等待下一个生成器值

转载 作者:行者123 更新时间:2023-12-05 07:38:41 25 4
gpt4 key购买 nike

你好!我正在尝试完成我的第一个 RxPY 项目,但我遇到了一些问题
了解 Python 中 flat_map 的行为。

在这个项目中,有一个从生成器(Kafka 消费者)创建的 Observable。它在收到消息时发出值,然后根据消息执行查询,并为每个结果发出一个值。

我对代码做了一些修改,以使其更容易重现。 Kafka 消费者被替换为生成器,生成器在两次发射之间花费大量时间,查询结果被替换为一个发射 3 个值的 Observable。行为仍然相同。

from rx import Observable

generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) \
.flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) \
.subscribe(on_next=lambda i: print(i))

输出:

a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c

我期待这样的事情:

a
b
c
(...waits a long time...)
a
b
c

这种行为的原因是什么?我应该怎么做才能得到预期的结果?

谢谢! :)

最佳答案

最近遇到了与 flat_map 运算符相同的问题,ImmediateScheduler 在这里提供了帮助。

为 RxPy 3 更新了一些初始代码:

import rx
from rx.operators import flat_map


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
flat_map(
lambda i: rx.from_(['a', 'b', 'c'])
)
).subscribe(on_next=lambda i: print(i))

输出略有不同,但问题是一样的:

(... waits a long time ...)
a
b
c
a
b
c

为 flat_map 内部的可观察对象应用 ImmediateScheduler:

import rx
from rx.operators import flat_map
from rx.scheduler import ImmediateScheduler


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
flat_map(
lambda i: rx.from_(['a', 'b', 'c'], scheduler=ImmediateScheduler())
)
).subscribe(on_next=lambda i: print(i))

并得到了预期的结果:

a
b
c
(...waits a long time...)
a
b
c

关于python - RxPY - flat_map 发射等待下一个生成器值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47770914/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com