gpt4 book ai didi

python - 如何从标准输入等流中创建 rx.py Observable?

转载 作者:太空狗 更新时间:2023-10-30 02:45:21 24 4
gpt4 key购买 nike

我正试图了解用于函数式响应式(Reactive)编程 (FRP) 的 rxpy 库,但我已经遇到了障碍。我正在编写一个小程序,它希望数据通过标准输入 (sys.stdin) 流入。

因此,我的问题很简单:如何创建一个 rx.Observable 实例,该实例将从 stdin 异步读取?是否有内置机制从流中创建 Observable 实例?

最佳答案

我从未使用过RxPy,但我对RxJS略有了解。

RxPya number of built-in methods您可能会为此目的使用它,但我倾向于创建一个 Observable 工厂。以 ObservableCreation.from_array 作为我们的指南,让我们现在尝试一下。 (注意:我还没有运行这段代码,但它应该可以帮助你完成大部分工作)

from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

@classmethod
def from_file(cls, readableFile, scheduler=None):
scheduler = scheduler or current_thread_scheduler

def subscribe(observer):
def action(action1, state=None):
try:
observer.on_next(readableFile.next())
action1(action)

except StopIteration: # EOF
observer.on_completed()

return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)

然后像这样使用它:

res = rx.Observable.from_file(sys.stdin)

这将在标准输入的每一行上创建一个可观察对象,直到 EOF。阻塞了,但是有ways around that .它还可以使用不同的调度程序进行调整。

关于python - 如何从标准输入等流中创建 rx.py Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24994680/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com