gpt4 book ai didi

java - `distinctUntilChanged` 一旦调用 onNext 如何允许相同的项目

转载 作者:行者123 更新时间:2023-11-29 23:25:33 24 4
gpt4 key购买 nike

我有一个连续发出项目的可观察对象,我需要处理每个项目(处理函数需要一些时间)。因此,同时在处理一个项目时,如果另一个项目发出相同的值,我可以忽略它,因为同样的事情已经在进行中。但是一旦当前项目被处理(并调用 onNext)。以后如果有同样的请求,我应该允许。我使用了 distinctUntildChanged 运算符,但我看到的是,如果当前项目与上一个项目相同,即使最后一个项目完成处理并调用了 onNext.

我有一个例子来证明这个问题

我有课

class User {
String id;
String name;

public User(String id, String name) {
this.id = id;
this.name = name;
}

@Override
public boolean equals(Object obj) {
User obj1 = (User) obj;
return id.equals(obj1.id);
}

@Override
public String toString() {
return name;
}
}

和一个可观察的(主题)

Subject<User> mSubject = PublishSubject.create();

我的 Observable 链是

 mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));

我发出这样的值

    for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
}

结果是

emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19

这里所有的对象都是相同的(equals 方法检查 id)。如您所见,它第一次花费了 user0,并且需要 5 秒来处理,在此期间我可以忽略传入的项目,但在那之后 onNext: User 0 我应该允许相同的用户请求,但是 distinctUntilChanged 不允许,因为它把最后一个值等同于同一个用户,我该怎么做这?希望我的问题很清楚。

最佳答案

因此,您可以使用 Flowable 和正确的 BackpressureStrategy 来实现这一点。问题是您在执行 observeOn 时没有设置缓冲区大小。你可以试试这个(尽管是 Kotlin):

Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { println("emitting $it") }
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.io(), false,1)
.subscribeOn(Schedulers.io())
.subscribe {
println("consuming $it")
Thread.sleep(500)
}

输出看起来像这样:

emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14

当您调用 observeOn(Scheduler) 时,如果我没记错的话,背压的默认缓冲区大小应该是 128。

您可以尝试将上述示例中的缓冲区大小更改为 3。您将得到:

emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...

关于java - `distinctUntilChanged` 一旦调用 onNext 如何允许相同的项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53647486/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com