gpt4 book ai didi

java - 为什么 rx-java 邮政编码失败

转载 作者:行者123 更新时间:2023-12-01 09:08:30 24 4
gpt4 key购买 nike

这是测试代码

    final Flowable<Integer> f1 = Flowable.fromPublisher(s -> {
s.onNext(Integer.valueOf(1));
s.onComplete();
});


final Flowable<Integer> f2 = Flowable.fromPublisher(s -> {
s.onNext(Integer.valueOf(2));
s.onComplete();
});

Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
.blockingSubscribe(System.out::println);

会得到

Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386)

不明白为什么?

如果我像这样更新代码

    final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> {
s.onNext(Integer.valueOf(1));
s.onComplete();
}).onErrorResumeNext(Flowable.empty());


final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> {
s.onNext(Integer.valueOf(2));
s.onComplete();
}).onErrorResumeNext(Flowable.empty());

Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
.blockingSubscribe(System.out::println);

它将按预期打印 12。但为什么?这没有意义。

最佳答案

问题是你违反了 Publisher<T> 的契约(Contract)随着您使用 fromPublisher .

发布商需要以 Reactive Streams 中指定的非常具体的方式行事。契约(Contract)。该行为包括调用 Subscriber.onSubscribe()在进行任何其他调用并尊重该订阅者的背压之前。

因为你不打电话onSubscribe内部queue变量永远不会被初始化并且调用 queue.offer在其 onNext方法会导致 NPE。

大概是通过使用 onErrorResumeNext该实现确保一切都被正确调用,“修复”无效状态。

要解决您的问题,有两种可能性:

  1. 请勿使用Flowable.fromPublisher 。它旨在连接 react 流声明的其他实现,并且没有任何保障措施。而是使用Flowable.create它正确处理初始化和背压。
  2. 使用非背压感知 Observable因为您的用例似乎并不关心背压。再次使用Observable.create安全使用方法。

关于java - 为什么 rx-java 邮政编码失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41071413/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com