- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有两个事件流。一个来自电感回路,另一个是网络摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相差 N 毫秒以内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(硬件都可能失败)全部合并到一个流中。像这样:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在我当然可以通过使用好的 ole Subject 反模式来解决问题:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅相当棘手,而且尽管我没有观察到它,但我很确定当我使用 threading.Timer
检查挂起队列时存在竞争条件。考虑到过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用 Subject
的情况下执行此操作,但我无法弄清楚。如何做到这一点?
尽管出于组织和操作原因,我更愿意坚持使用 Python,但我会采用 JavaScript rxjs 答案并移植它,甚至可能在节点中重写整个脚本。
最佳答案
您应该能够使用 auditTime
和 buffer
解决问题。像这样:
function matchWithinTime(a$, b$, N) {
const merged$ = Rx.Observable.merge(a$, b$);
// Use auditTime to compose a closing notifier for the buffer.
const audited$ = merged$.auditTime(N);
// Buffer emissions within an audit and filter out empty buffers.
return merged$
.buffer(audited$)
.filter(x => x.length > 0);
}
const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));
setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
如果 b
值可能紧跟在 a
值之后,而您不希望它们匹配,则可以使用更具体的审计,如下所示:
const audited$ = merged$.audit(x => x === "a" ?
// If an `a` was received, audit upcoming values for `N` milliseconds.
Rx.Observable.timer(N) :
// If a `b` was received, don't audit the upcoming values.
Rx.Observable.of(0, Rx.Scheduler.asap)
);
关于python-3.x - 如何使用 rxpy/rxjs 延迟事件发射?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50395441/
我正在尝试使用 DynamicMethod 并尝试使用 IL 来创建一些对象。我想创建以下非常基本的对象: new Queue(new List{100}); 我已经使用 ILDASM 查看生成此代码
这是我stackoverflow的第一个问题!我有一个显示 mpl 图 Canvas 的 PyQT gui。我已将主轴周围的边距设置为 0,因此绘图将完全填满图形 Canvas 和包含它的小部件。问题
我的环境是 Windows 7,安装了 scala 2.11.4(运行良好),Java 1.8 我已经尝试过 spark-1.2.0-bin-hadoop2.4 和 spark-1.2.1-bin-h
对于我的生活,我无法弄清楚为什么我不能发送或捕获一些数据。 toggleNavigation() 触发,但我不确定 .emit() 是否真的在工作。 最终我想折叠和展开导航,但现在我只想了解如何将数据
我试图在 VUE 3 中传递一个 emit prop,每次传递它时我仍然得到 false,并且 prop 无法切换。 Accordion .vue
我有一个 View 模型,它采用初始 ViewState对象并具有可公开访问的 state可以收集的变量。 class MyViewModel(initialState: ViewState) : V
现在在玩 RxJava,偶然发现了以下问题: 我有 2 个不同的流: 带有项目的流 Stream(只有 1 个项目),它发出第一个流的转换信息。 所以基本上我有项目流,我希望所有这些项目与第二个流中的
我有一个 API 登录服务,它使用 http 服务来执行登录逻辑(LoginApiService、login-api.service.ts): login(data: LoginCredentials
我们有微服务架构,我们通过网络进行服务间调用。我们在顶层服务中使用 RxJava,这会导致向底层服务创建大量并行请求。因此,我收到“没有到主机的路由错误”或“连接错误”。为此,我想减慢 RxJava
Vue.component('rating-edit', { template:` {{rating.remark}} Sav
我最近购买了 Dream Cheeky Thunder 导弹发射器,我希望通过我的树莓派来控制它。 使用来自报复的代码(https://raw.githubusercontent.com/codeda
我制作了这段代码来记录发送到我的机器人的 DM: client.on('messageCreate', async message => { if (message.author.bot) r
我需要从服务器代码、路由器/ Controller 上的任何位置发出来自服务器的套接字。我检查了一些线程和谷歌,但没有按预期工作。 app.js var app = require('express'
我是一名优秀的程序员,十分优秀!