gpt4 book ai didi

java - 在 RxJavaobserveOn 调度程序线程上查询

转载 作者:行者123 更新时间:2023-11-30 06:46:25 26 4
gpt4 key购买 nike

我必须根据传入的请求写入文件。由于多个请求可能同时出现,我不希望多个线程尝试一起覆盖文件内容,这可能会导致丢失一些数据。

因此,我尝试使用 PublishSubject 实例变量收集所有请求的数据。我在 init 期间订阅了 publishSubject,并且此订阅将在应用程序的整个生命周期中保留。此外,我还在一个单独的线程(由 Vertx 事件循环提供)上观察同一个实例,该线程调用负责写入文件的方法。

private PublishSubject<FileData> publishSubject = PublishSubject.create();

private void init() {
publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}

稍后在请求处理期间,我调用 onNext 如下:

handleRequest() {
//do some task
publishSubject.onNext(fileData);
}

据我所知,当我调用onNext时,数据将排队,由observeOn运算符分配的特定线程写入文件。然而,我想要理解的是

  1. 该线程是否仅为此被阻塞在 WAITING 状态任务?或者,
  2. 没有文件时也会用于其他 Activity 吗写作发生?我不想因为采用这种方法而导致 vertx 事件循环中的一个线程浪费在等待状态。另外,如果有更好的方法,请提出建议。

提前致谢。

最佳答案

实际上 RxJava 会为你做这件事,根据定义 onNext() 发射将以串行方式起作用:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. (Observable Contract)

因此,只要您在订阅者的 onNext() 内运行阻塞调用(并且不会手动将工作 fork 到不同的线程),就可以了,并且不会出现并行写入发生。

其实,你的担忧应该来自相反的方向——背压。
您应该在这里选择您的反压策略,就好像请求来得更快,然后您将处理它们(写入文件),您可能会溢出缓冲区并陷入麻烦。 (考虑使用 Flowable 并根据您的需求选择背压策略。

关于您的问题,这取决于调度程序,您使用的是 RxHelper.blockingScheduler(vertx) 这似乎是您的自定义代码,所以我无法判断调度程序是否使用共享线程以工作队列方式运行,那么它就不会保持空闲状态。
不管怎样,Rx 不会为你决定这一点,调度程序的职责是根据其逻辑将工作分配给某个线程。

关于java - 在 RxJavaobserveOn 调度程序线程上查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43610483/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com