gpt4 book ai didi

python - 集成响应式扩展和扭曲的基本示例?

转载 作者:行者123 更新时间:2023-11-28 16:29:41 25 4
gpt4 key购买 nike

我正在寻找一个关于如何使用响应式扩展 (RxPY) 和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小问候应用程序。

def hello():
print 'Hello from the reactor loop!'
print 'Lately I feel like I\'m stuck in a rut.'

from twisted.internet import reactor

reactor.callWhenRunning(hello)

print 'Starting the reactor.'
reactor.run()

我想使用 RxPY 库连接到这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作...

我能找到的所有 RxPY 示例要么生成它们自己的流,例如从一个可迭代的,以下代码流整数 0-9:

xs = Observable.from_(range(10))
xs.map(
lambda x: x * 2
).subscribe(print)

或者包含在更复杂的示例中(比如子类化 WebSocket Handler)。知道如何拦截打印消息吗? EG,从 Twisted react 堆生成一个可观察流?

最佳答案

我一直在努力弄清楚同样的事情。这是我的发现。

启动 RX 数据流的最简单方法是创建 Subject(),它结合了 ObserverObservable。然后,您可以使用 on_next 方法将数据输入其中。

接收部分:

from rx.subjects import Subject
subject=Subject()
subject.filter(...).map(...).subscribe(my_observer)

从你扭曲的回调中,你只需做:

subject.on_next(data_item)
...
subject.on_completed()

或者您可以发出错误信号:

subject.on_error(my_error)

在学习 RxPy 的过程中,我编写了 simple web server using both RxPy and Twisted .它是单个python文件,可以用作示例。

关于python - 集成响应式扩展和扭曲的基本示例?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33217179/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com