- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个外部服务 (ExternalDummyService),我在其中注册了一个回调。我想从该回调创建一个可观察对象并订阅多个异步进程。
pyfiddle 中的完整代码:https://pyfiddle.io/fiddle/da1e1d53-2e34-4742-a0b9-07838f2c13df* 请注意,在 pyfiddle 版本中,“ sleep ”被替换为“for i in range(10000): foo += i”,因为 sleep 无法正常工作。
主要代码是这样的:
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs.subscribe(slow_process)
external_obs.subscribe(fast_process)
external_obs.connect()
thread.start()
class ExternalDummyService(Thread):
def __init__(self):
self.subject = Subject()
def run(self):
for i in range(5):
dummy_msg = { ... }
self.subject.on_next(dummy_msg)
def fast_process(msg):
print("FAST {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
sleep(0.1)
def slow_process(msg):
print("SLOW {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
sleep(1)
我得到的输出是这个,两个进程同步运行,ExternalDummyService 在两个进程完成每次执行之前不会发出新值:
emitting 0
STARTED
SLOW 0 1.0008811950683594
FAST 0 2.0122528076171875
emitting 1
SLOW 1 1.5070438385009766
FAST 1 1.5070438385009766
emitting 2
SLOW 2 0.5052089691162109
FAST 2 0.9891986846923828
emitting 3
SLOW 3 1.0006427764892578
FAST 3 1.0006427764892578
emitting 4
SLOW 4 1.0013580322265625
FAST 4 1.0013580322265625
FINISHED
我想要这样的东西,服务发出消息而不等待进程运行并且进程异步运行:
STARTED
emitting 0
emitting 1
emitting 2
FAST 0 2.0122528076171875
FAST 1 1.5070438385009766
emitting 3
SLOW 0 1.0008811950683594
FAST 2 0.9891986846923828
emitting 4
FAST 3 1.0006427764892578
SLOW 1 1.5070438385009766
FAST 4 1.0013580322265625
SLOW 2 0.5052089691162109
SLOW 3 1.0006427764892578
SLOW 4 1.0013580322265625
FINISHED
我尝试过使用 share()、ThreadPoolScheduler 和其他我不知道我在做什么的东西。
谢谢!
最佳答案
使用这个问题的答案:RxJava concurrency with multiple subscribers and events
...我用这段代码达到了预期的结果:
optimal_thread_count = cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
.subscribe(fast_process)
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
.subscribe(slow_process)
external_obs.connect()
thread.start()
完整版:https://pyfiddle.io/fiddle/20f8871c-48d6-4d6b-b1a4-fdd0a4aa6f95/?m=Saved%20fiddle
输出是:
emitting 0
emitting 1
emitting 2
emitting 3
emitting 4
FAST 0 52.629709243774414
FAST 1 51.12814903259277
FAST 2 100.2051830291748
FAST 3 151.2434482574463
SLOW 0 503.0245780944824
SLOW 1 502.0263195037842
FAST 4 548.7725734710693
SLOW 2 551.4400005340576
SLOW 3 652.1098613739014
SLOW 4 1000.3445148468018
请随时提出任何改进建议。
关于python - RxPy : How to create hot observable from external callback and subscribe multiple asynchronous processes?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49797475/
简介: 你好。我正在探索 python rxpy用于我的用例的库 - 我正在使用响应式(Reactive)编程概念构建执行管道。通过这种方式,我希望我不必操纵太多状态。虽然我的解决方案似乎很实用,但是
所以我在学习 RxJava 和 RxKotlin 两年后正在学习 RxPy。我注意到的一件事是某些运算符会导致疯狂的交错,而这在 RxJava 中不会发生。 例如,对于简单的 Observable 源
我正在学习 RxPY ,所以我需要编写一些代码来将每个单词按第一个字符拆分。结果必须如下所示: {'a': ['a'], 't': ['the','the'], 'l': ['low','lazy']
你好!我正在尝试完成我的第一个 RxPY 项目,但我遇到了一些问题 了解 Python 中 flat_map 的行为。 在这个项目中,有一个从生成器(Kafka 消费者)创建的 Observable。
我需要创建一个 Observable 流,它定期发出异步协程的结果。 intervalRead是一个函数,它返回一个 Observable,并将区间 rate 作为参数。和一个异步协程函数 fun ,
如何从 RxPy 中的 Observable 序列中恢复元素 obs = Observable.from_([1,2,3]) print obs.first() 应该打印 1,但它返回另一个 Anon
基于此excellent SO answer我可以让多个任务在 RxPy 中并行工作,我的问题是你如何等待它们全部完成?我知道使用线程我可以执行 .join() 但 Rx Schedulers 似乎没
我正在尝试将 python ReactiveX 流(使用 RxPy 库)发送到 Web UI 组件上的 javascript,但我似乎找不到这样做的方法。此外,我可能需要将进入 Javascript
RxPy 是否有一个方便的 flatten 运算符,相当于 flat_map(identity)?或者将 flat_map 的 selector 默认为 identity? 最佳答案 来自Dag Br
我遇到了一个错误,它意外地将 Observable 用作可迭代对象。对于大多数对象,这通常很容易检测到: >>> tuple(object()) Traceback (most recent call
在函数中将 rx.Observable 对象转换为“普通”对象的优雅方法是什么? 例如: def foo(): return rx.Observable.just('value').subsc
我的设置: 我有一个(时间,价格)元组中的股票价格数据列表: from datetime import datetime prices = [(datetime(2015, 1, 9), 101.9)
(注意:这个问题的背景非常冗长,但底部有一个 SSCCE 可以跳过) 背景 我正在尝试开发一个基于 Python 的 CLI 来与 Web 服务进行交互。在我的代码库中,我有一个 Communicat
我有两个事件流。一个来自电感回路,另一个是网络摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相差 N 毫秒以内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(硬件都可
我跟着这个great tutorial使用 tweepy 在 Python 中利用实时 Twitter 流。这将实时打印提及 RxJava、RxPy、RxScala 或 ReactiveX 的推文。
我正在尝试着手安排 python 的响应式扩展。我想使用 subscribe_on 并行处理多个可观察对象。如果使用 just 创建 observable,这会很好地工作,但如果使用 range 或
TL;DR 我正在寻求帮助来实现下面的弹珠图。目的是尽可能对未排序的值进行排序,而无需在扫描执行之间等待时间。 我不是要求完整的实现。欢迎任何指导。 我有一个无限热可观察对象的异步慢速(出于测试目的而
我有一个外部服务 (ExternalDummyService),我在其中注册了一个回调。我想从该回调创建一个可观察对象并订阅多个异步进程。 pyfiddle 中的完整代码:https://pyfidd
我是一名优秀的程序员,十分优秀!