gpt4 book ai didi

java - RxJava - Flowable.map 在 flatMap 之后没有被调用

转载 作者:行者123 更新时间:2023-12-01 16:29:54 25 4
gpt4 key购买 nike

我在 rxjava (io.reactivex.rxjava2 - v2.0.8) 中的 Flowable 与 Observable 方面遇到了奇怪的问题。这里我的代码如下所示,其中 ma​​p(...).subscribe(...) 函数没有被调用/执行。

flowable.flatMap(...return new flowable...).map(...).subscribe(...)

令人惊讶的是,如果我翻转代码以使用 Observable 而不是 Flowable ,则 ma​​p(...).subscribe(...) 将被调用/按预期执行。我可能会遗漏一些简单的东西,请让我知道它可能是什么?

谢谢

`

public class Application {
public static void main(String[] args) {
// Note: Working; get the list of databases
DatabaseFlowable databases = new DatabaseFlowable(sourceClient);
// Note: Working; for each database get the list of collections in it
Flowable<Resource> resources = databases
.flatMap(db -> {
logger.info(" ==> found database {}", db.getString("name"));
return new CollectionFlowable(sourceClient, db.getString("name"));
// Note: Working; CollectionFlowable::subscribeActual works as well
});
resources
.map(resource -> {
// Note: Nothing in here gets executed
logger.info(" ====> found resource {}", resource.toString());
return resource;
})
.subscribe(m -> {
// Note: Nothing in here gets executed
logger.info(m.toString());
});
}
}



public class DatabaseFlowable extends Flowable<Document> {
private final static Logger logger = LoggerFactory.getLogger(DatabaseFlowable.class);
private final MongoClient client;

public DatabaseFlowable(MongoClient client) {
this.client = client;
}

@Override
protected void subscribeActual(Subscriber<? super Document> subscriber) {
ListDatabasesIterable<Document> cursor = client.listDatabases();
MongoCursor<Document> iterator = cursor.iterator();

while(iterator.hasNext()) {
Document item = iterator.next();
if (!item.isEmpty()) {
String message = String.format(" found database name: %s, sizeOnDisk: %s",
item.getString("name"), item.get("sizeOnDisk"));
logger.info(message);
subscriber.onNext(item);
}
}
subscriber.onComplete();
}
}

public class CollectionFlowable extends Flowable<Resource> {

private final static Logger logger = LoggerFactory.getLogger(CollectionFlowable.class);
private final MongoClient client;
private final String databaseName;

public CollectionFlowable(MongoClient client, String databaseName) {
this.databaseName = databaseName;
this.client = client;
}


@Override
protected void subscribeActual(Subscriber<? super Resource> subscriber) {
MongoDatabase database = client.getDatabase(databaseName);

ListCollectionsIterable<Document> cursor = database.listCollections();
MongoCursor<Document> iterator = cursor.iterator();

while(iterator.hasNext()) {
Document item = iterator.next();
if (!item.isEmpty()) {
logger.info(" ... found collection: {}.{}", database.getName(), item.getString("name"));
Resource resource = new Resource(databaseName,
item.getString("name"),
(Document) item.get("options"));
subscriber.onNext(resource);
}
}
subscriber.onComplete();
}
}

`

最佳答案

那是因为您没有正确遵循 Flowable 协议(protocol)。没有调用 subscriber.onSubscribe(...) 并且不遵守 subscriber.request(...) 施加的限制。

由于您的实现根本不会观察到背压,因此可以使用 Flowable.create 创建一个可以观察到背压的缓冲版本,或者将您的实现移至不存在背压的 Observable

您看到的行为的原因是下游观察者尚未请求任何项目,因此您对 onNext 的调用将被丢弃。

关于java - RxJava - Flowable.map 在 flatMap 之后没有被调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43669230/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com