gpt4 book ai didi

java - RxJava : Reduce not working as expected

转载 作者:行者123 更新时间:2023-12-01 18:00:25 28 4
gpt4 key购买 nike

我正在尝试并行加载用户对象。

    final User user = new User();
final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS")
.flatMap(field -> getOrchestrator(user, field))
.scan(new User(), (finalUser, event) -> {
finalUser = event;
return finalUser;
});

Scan 确实发出了三个用户对象,而reduce 根本不发出任何项目?我在这里做错了什么。

    final User user = new User();
final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS")
.flatMap(field -> getOrchestrator(user, field))
.reduce(new User(), (finalUser, event) -> {
finalUser = event;
return finalUser;
});

getOrchestrator 返回 Observable。任何帮助将不胜感激。

下面是完整的代码片段

public class Orchestrator {
private String userId;

public Orchestrator(final String userId) {
this.userId = userId;
}

public static void main(final String[] args) throws Exception {
final User user = new User();
final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS")
.flatMap(field -> getOrchestrator(user, field))
.scan(new User(), (finalUser, event) -> {
finalUser = event;
return finalUser;
});

userObs.subscribeOn(Schedulers.io()).subscribe(result -> {
System.out.println(result.toString());
});

TimeUnit.SECONDS.sleep(10);

}

private static Observable<User> getOrchestrator(final User user, final String fieldName) {
switch (fieldName) {
case "CURRENT_ADDRESS":
return new AddressOrchestrator().getCurrentAddress(user.getUserId())
.map(currentAddress -> {
user.setAddress(currentAddress);
try {
TimeUnit.MILLISECONDS.sleep(200);
}
catch (final InterruptedException e) {

}
return user;
});
case "ADDRESSES":
return new AddressOrchestrator().getAddresses(user.getUserId())
.map(addresses -> {
user.setAddresses(addresses);
try {
TimeUnit.MILLISECONDS.sleep(200);
}
catch (final InterruptedException e) {

}
return user;
});

case "NAMES":
return new NameOrchestrator().getNames(user.getUserId())
.map(names -> {
user.setNames(names);
try {
TimeUnit.MILLISECONDS.sleep(200);
}
catch (final InterruptedException e) {

}
return user;
});
}
return null;
}

public User getUser() {
final Random r = new Random();
if (r.nextInt(3) % 2 == 0) {
return new User();
}
throw new RuntimeException();
}
}

每个编排器都会返回 Observable。

public class AddressOrchestrator {

public Observable<List<Address>> getAddresses(final String userId) {
return Observable.create(s -> {
final Address currentAddress = this.getBaseAddress(userId);
final Address anotherAddress = this.getBaseAddress(userId);
anotherAddress.setState("NE");
s.onNext(Arrays.asList(currentAddress, anotherAddress));
});

}

public Observable<Address> getCurrentAddress(final String userId) {
return Observable.create(s -> s.onNext(this.getBaseAddress(userId)));
}

public Address getBaseAddress(final String userId) {
final Address address = new Address();
address.setLine1("540 Caddo Lake Dr");
address.setCity("Georgetown");
address.setCountry("USA");
address.setState("TX");

return address;
}
}


public class NameOrchestrator {

public Observable<List<Name>> getNames(final String userId) {
return Observable.create(s -> {
final Name name = new Name();
name.setName("Vanchi");
final Name formerName = new Name();
formerName.setName("Vanchinathan");
s.onNext(Arrays.asList(name, formerName));
});
}
}

最佳答案

您使用 Observable.create 创建的编排(这是一个很大的危险信号,除非您真的知道自己在做什么)不会终止。这会导致无限的流(从某种意义上说,永远不会发出完整的事件)。你需要什么,比如

public Observable<List<Address>> getAddresses(final String userId) {
return Observable.create(s -> {
final Address currentAddress = this.getBaseAddress(userId);
final Address anotherAddress = this.getBaseAddress(userId);
anotherAddress.setState("NE");
s.onNext(Arrays.asList(currentAddress, anotherAddress));
s.onCompleted();
});

注意正在调用的onCompleted。这将解决您的表面问题,但您最好首先摆脱 Observable.create 并使用 Observable.defer 之类的东西。

关于java - RxJava : Reduce not working as expected,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41431700/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com