- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
TL;DR 我正在寻求帮助来实现下面的弹珠图。目的是尽可能对未排序的值进行排序,而无需在扫描执行之间等待时间。
我不是要求完整的实现。欢迎任何指导。 我有一个无限热可观察对象的异步慢速(出于测试目的而强制)扫描。这是相关代码:
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
这是完整版:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
这是当前输出(值是随机生成的):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED
我想在扫描执行之间对未决消息进行排序。因此,第一个发出的消息将始终是第一个被消费的消息,但下一个消费的消息将是在那一点之前发出的和未消费的消息的最小值(所有这些都在当前版本中,因为即时发出)。等等……我认为大理石图比我的解释更好。
请注意,扫描不是在等待完成事件,它在发出最后一条消息后没有开始的唯一原因是因为 sleep 。 Here you have another version其中 sleep 已从扫描中删除并放入 ExternalDummyService。您可以看到值在发出时就被消耗掉了。大理石图中也显示了这一点。
我试过 to_sorted_list ,我在 RxPy 中找到的唯一排序方法,但我无法使其工作。
我正在寻找的是这样的:
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
.buffered_sort(lambda msg: msg.timestamp) \
############
.scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
谢谢
最佳答案
如果您想使用to_sorted_list
,您需要重新映射您在单个元素中获得的列表。将 main
函数更改为:
def main():
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.to_sorted_list(key_selector=lambda msg: msg.timestamp) \
.flat_map(lambda msglist: Observable.from_iterable(msglist)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
给出:
>emitting Msg(count=0, timestamp=18.924474716186523)
>emitting Msg(count=1, timestamp=4.669189453125)
>emitting Msg(count=2, timestamp=18.633127212524414)
>emitting Msg(count=3, timestamp=15.151262283325195)
>emitting Msg(count=4, timestamp=14.705896377563477)
>SLOW st.count=0 last_msg.counter=1 ts=4.67
>SLOW st.count=1 last_msg.counter=4 ts=14.71
>SLOW st.count=2 last_msg.counter=3 ts=15.15
>SLOW st.count=3 last_msg.counter=2 ts=18.63
>SLOW st.count=4 last_msg.counter=0 ts=18.92
>SLOW FINISHED
请注意,to_sorted_list
方法将等待主题流的末尾开始扫描,因此您不能使用它来实现问题中显示的弹珠图。
要正确实现它,我认为您需要像 onBackpressureBuffer
这样的东西这是在 RxJava 中实现的,而不是在 RxPy 中实现的。
这不会完全解决问题,因为缓冲区是 FIFO(先进先出)并且您需要一种自定义方式来选择先发出的消息。这可能需要调整对缓冲区请求的处理方式。
您可能会找到一种更好的方法,通过名为 rxbackpressure 的 RxPy 扩展来解决问题。 , 特别是它的类 dequeuablebuffer.py您可以根据自己的需要进行调整。
关于python - RxPy : Sort hot observable between (slow) scan executions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49803754/
我已经使用 flutter_web 有一段时间了,从来没有真正质疑过它在按下“热重载”时总是重新启动整个应用程序,但自从现在 flutter_web 被合并到主要的 flutter channel 我
我正在使用 webpack-dev-server处于开发模式( watch )。每次服务器重新加载时,一些 json 和 js 文件都会挤满我的构建目录,如下所示:'hash'.hot-update.
我正在尝试让 React-hot-loader 3 与 React-hot-loader 3、React-router 4 和 Webpack-hot-middleware(最新版本,2.18.2)一
我正在尝试使用 Handsontable 版本 0.34.4CE/1.14.2 PRO 在 Handsontable (HOT-in-HOT) 中创建 Handsontable。根据此处提供的文档,一
使用one-hot encoding,一旦你有一个包含 1 个值的列,让我们说“color”,pandas get_dummies 将做如下: df = pd.DataFrame({'f1': ['r
鉴于这些是我正在使用的依赖项: "react-hot-loader": "3.0.0-beta.7", "webpack": "2.6.1", "webpack-dev-middleware": "^
我在我的输出目录中建立了一系列热加载器文件 (*.hot-loader.json)。如何确保此输出目录清除不必要的文件? 注意:我也在使用 Webpack。 最佳答案 使用 webpack-middl
我是机器学习和深度学习的新手。我想解决时间序列问题,该问题每秒都有数据。另外,我最近一直在研究word2vector和时间序列数据。有一天,我想到了一个想法,将日期时间等序列数据转换为 one-hot
我正在尝试让 React Hot Reloader 适用于我的 ReactJS 项目,但收到错误错误:找不到相对于目录的预设“react-hot”... 我确实在 .babelrc 中设置了预设“re
基于网络阅读、堆栈溢出,主要是 these articles关于与编码恐怖相关的数据库版本控制,我已经尝试编写一个计划来对一个有 8 年历史的 php mysql 网站的数据库进行版本控制。 Data
我正在尝试想出一种方法来确定某些帖子在论坛中的“热门”程度。你会使用什么标准,为什么?如何将这些结合起来得出热度分数? 我考虑的标准包括: 有多少回复 距离上次回复有多久 平均回复时间 该算法必须解决
我正在尝试复制 reddit's hot algortithm用于整理我的帖子。这是我的功能: def hot(self): s = self.upvotes baseScore =
先给大家展示下效果图,看看是不是在你的意料之中哈。 labelview是在github上一个开源的标签库。其项目主页是:https://github.com/linger1216//label
我的R代码有问题,而缺少值。实际上不知道如何使用简单的Hot Deck方法估算这些值。例如,拥有这些数据。 1 10000123 111 112820 0.24457235 NA
我正在研究有关Node.js的教程,网址为:http://www.johnpapa.net/get-up-and-running-with-node-and-visual-studio/ 我可以让该应
我有一个像这样的 CSV 文件 我想选择最后一列并使每个序列的字符级单热编码矩阵,我使用此代码但它不起作用 data = pd.read_csv('database.csv', usecols=[4]
我有一个包含混合字符串的列,我创建了列来表示字符串中的每个唯一字符。我需要用 [1,0] 对列进行编码如果字符串中的任何字符与这些列之一匹配。 library(data.table) d = data
当 Jetty 上有原生 Java 代码时,您可以执行热部署。 例如,这使您可以更改 servlet 代码,而无需重新启动服务器即可查看应用程序更改。 但是,如果您在 Java 之上运行脚本语言 -
假设我想从 Reddit 子版 block “新闻”流式传输帖子。然而,帖子非常频繁,我们不能说每个帖子都值得。所以我想通过尝试流式传输“热门”列表来过滤好帖子。但我不确定这是否可能,或类似的事情是否
这是我的服务器代码: if (process.env.NODE_ENV === 'development') { // Enable logger (morgan) a
我是一名优秀的程序员,十分优秀!