gpt4 book ai didi

java - RxJava : PublishSubject acts synchronously

转载 作者:行者123 更新时间:2023-12-02 02:03:42 28 4
gpt4 key购买 nike

我需要一种功能,允许将消息异步推送到我的 PublishSubject 并通过 ConnectableObservable 以一定的速度(实际上是一个接一个)处理它们。不幸的是,在底层的Subscriber处理消息之前,对PublishSubjectonNext的调用似乎不会被释放。

处理每条消息需要花费几秒钟的时间,并且在 Debug模式下,我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 - “推送后...” 始终出现在控制台中订阅者内的内部日志之后...

所以我有这个 RestEndpoint:

@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {

@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();

}

这是 DataImporter 的构造函数:

public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {

@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {

@Override
public void onCompleted() {
// TODO Auto-generated method stub

}

@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}

@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}

@Override
public void onStart() {
request(1);
}
});
}

然后 DataImporter 的 pushData 只是推送到 PublishSubjectonNext 方法..:

public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}

这是 PublishSubjectConnectableObservable 的声明:

public class DataImporter implements ImporterProxy{

private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;

最佳答案

PublishSubject 在原始 onXXX 调用的线程上向其使用者发送:

JavaDocs

Scheduler:

PublishSubject does not operate by default on a particular Scheduler and the Observers get notified on the thread the respective onXXX methods were invoked.

您必须使用observeOn将处理移动到其他线程,因为observeOn可以将onXXX调用移动到另一个线程。

subscribeOn 一般对 Subject 没有任何实际作用,因为它只影响订阅线程,不会调制后续的 onXXX 调用这些主题。

关于java - RxJava : PublishSubject acts synchronously,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51160353/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com