- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要一种功能,允许将消息异步推送到我的 PublishSubject
并通过 ConnectableObservable
以一定的速度(实际上是一个接一个)处理它们。不幸的是,在底层的Subscriber
处理消息之前,对PublishSubject
的onNext
的调用似乎不会被释放。
处理每条消息需要花费几秒钟的时间,并且在 Debug模式下,我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 - “推送后...”
始终出现在控制台中订阅者
内的内部日志之后...
所以我有这个 RestEndpoint:
@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();
}
这是 DataImporter 的构造函数:
public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {
@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}
@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}
@Override
public void onStart() {
request(1);
}
});
}
然后 DataImporter 的 pushData
只是推送到 PublishSubject
的 onNext
方法..:
public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}
这是 PublishSubject
和 ConnectableObservable
的声明:
public class DataImporter implements ImporterProxy{
private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;
最佳答案
PublishSubject
在原始 onXXX
调用的线程上向其使用者发送:
Scheduler:
PublishSubject
does not operate by default on a particularScheduler
and theObserver
s get notified on the thread the respectiveonXXX
methods were invoked.
您必须使用observeOn将处理移动到其他线程,因为observeOn可以将onXXX调用移动到另一个线程。
subscribeOn
一般对 Subject
没有任何实际作用,因为它只影响订阅线程,不会调制后续的 onXXX
调用这些主题。
关于java - RxJava : PublishSubject acts synchronously,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51160353/
我正在处理这样的情况,我想在这里应用 RxSwift。 我有带有按钮的.xib UIView。 class RightButtonItemView: UIView { @IBOutlet we
我在 Playground 上玩 RxSwift 时遇到了警告。这是完整的警告消息: Synchronization anomaly was detected. - Debugging: To deb
为什么subscribe在这里从不打印任何内容?只是出于好奇。无论如何,这是一个坏习惯,我会改用observeOn,但是我无法弄清楚为什么从来没有达到subscribe ... fun main()
我正在尝试使用 RxJava 创建模型的代理,该代理允许其他人订阅模型中的更改。 这是我的代理的简化版本: class MyModelProxy { private static MyMode
我尝试使用 rx 和 PublishSubject .所以我创建对象: PublishSubject events = PublishSubject.create(); 在我的服务中,我输入了 MyE
PublishSubject 非常适合在观察者和可观察对象之间架起一座桥梁: Observable observable = Observable.just("string"); Pu
我知道,所有类型的 Rx 主题都可以获得订阅中未正确排序的元素,例如。如果我按 1、2、3 的顺序发送三个元素,则可以选择按以下顺序获取它:1、3、2。 我想知道,有没有办法强制发射元素在开始和结束时
我正在努力处理包含 RxSwift 的 PublishSubject 的特定用例。 为了简单起见,省略了不重要的细节。 有一个 MVVM 设置。在 VC 中,我有一个 UIButton,点击它应该发送
我有一个 View Controller ,它打开一个模态视图 Controller 供用户从其库中选择图像。为此,我使用了我编写的 DKImagePickerController 的 Rx 包装器。
我正在努力了解以下黄金法则(如果有的话): When to use BehaviorSubject ? 和 When to use PublishSubject ? 它们之间的区别很明显 There
谁能解释一下为什么 PublishSubject 不能很好地与 firstOrError() 一起工作? 我希望 firstOrError 在创建没有任何值的 PublishSubject 时返回 N
我正在尝试使用 PublishSubject 来转发按钮点击。但是,此 PublishSubject 会在初始化时触发,这会干扰我的逻辑。这就是我正在做的: var buttonClick = Pub
我将 Button pressed 绑定(bind)到路由器中的 PublishSubject,如下所示: hambugerButton .rx_tap .bindTo(router.op
我有一个已注册 doOnSubscribe 和 doOnUnsubscribe 操作的 PublishSubject。如果完成订阅,则不会调用这两个操作。 private PublishSubject
我在学习Viper带 RxSwift . 我想通知我的Presenter那viewDidLoad在我的 ViewController 中被调用. 为此,我有以下内容: class LoginPrese
我正在尝试测试我的 ViewModel 的主要功能。重要的步骤是测试加载状态完成。但可以肯定的是,为了更好的测试,测试所有状态可能会很有趣。 我正在阅读很多关于 RxTest 和 RxBlocking
我需要一种功能,允许将消息异步推送到我的 PublishSubject 并通过 ConnectableObservable 以一定的速度(实际上是一个接一个)处理它们。不幸的是,在底层的Subscri
RxJava 查询 : 你好, 我有一个 PublishSubject subject = PublishSubject.create() ; 我正在订阅上述主题并在此之后进行 API 调用: sub
我在我的项目中使用 ReactiveSwift,我想知道 PublishSubject 的等价物是什么? 例如在 RXSwift 中我们可以这样做: let disposeBag = DisposeB
我正在创建基于 CocoaAsyncSocket 和 STOMP 协议(protocol)的简单消息传递应用程序。所以我创建了主类,它使用 PublishSubject ,因此订阅者可以观察传入的 S
我是一名优秀的程序员,十分优秀!