gpt4 book ai didi

python - 分区的 Observable 第二个流从未到达

转载 作者:行者123 更新时间:2023-12-01 09:09:42 25 4
gpt4 key购买 nike

我有一个 Observable 处理 Web 请求,我想在单独的流中处理成功或失败,与 this example 非常相似。我的脚本和示例之间的主要区别是我不想合并流然后订阅。我正在使用 RxPY 1.6.1 和 Python 2.7。

request = Observable.of(requests.get(self.URL, params=request_params))

request_success, request_failed = request.partition(lambda r: r.status_code == requests.codes.ok)

request_failed.subscribe(lambda r: print_msg('failure!'))
request_success.subscribe(lambda r: print_msg('success!'))

当请求失败时,脚本将按预期打印 failure!。但是,当响应正常时,脚本不会打印 success!。有趣的是,当您切换订阅顺序时,确实会打印 success!,而永远不会达到 failure!

我想也许 request 无法多播,所以我尝试将 publish() 添加到 request observable 并调用 connect () 创建订阅后。这没有帮助(所以我将其排除在上面的最小示例之外)。

我错过了什么?

最佳答案

将您的代码与 the unit tests that RxPy has for the partition operator 进行比较看起来代码几乎是正确的。

您的方向是正确的,您确实需要将请求 Observable 转换为多播 Observable。

Here is working code (tested on Repl.it, you will have to convert the list of requests back to the classes/objects you're using in your code) :

from rx import Observable

def print_msg(message):
print(message)

class Request(object):
def __init__(self, status_code):
self.status_code = status_code

request = Observable.of(
Request(200),
Request(404),
Request(412),
Request(200),
).publish()

request_success, request_failed = request.partition(lambda r: \
r.status_code == 200)

request_success.subscribe(lambda r: print_msg('success!'))
request_failed.subscribe(lambda r: print_msg('failure!'))
request.connect()

请注意,一旦请求列表转换为 Observable,它就会被发布 (Observable.of(...).publish()),并且仅在之后 我们订阅分区的可观察量,我们称之为连接。

输出是:

success!
failure!
failure!
success!

关于python - 分区的 Observable 第二个流从未到达,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51763447/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com