gpt4 book ai didi

rx-java - RxJava 背压和对生产者的调用次数

转载 作者:行者123 更新时间:2023-12-02 21:29:20 25 4
gpt4 key购买 nike

我正在尝试使用 rx Java 中的背压在我的 Android 应用程序中创建无限滚动。我希望它仅调用外部服务请求的次数(在调用 request(1) 之后)。但使用 flatmap 之后,每个 subscribe 都会加载 16 个页面。

在我的代码下方,具有预期结果。几乎每个测试都因第一个请求而失败(n=16)

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;

public class ServiceObservablesTest {


public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
AtomicInteger pageNumber = new AtomicInteger();
subscriber.setProducer(n -> {
// at subscribe rxJava makes request for 16 elements - probably because of flatMap
// after first request with 16 elements everything seems to work fine even if i ignore the 'n' param

Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
subscriber.onNext(page);

});
});
return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
}

public interface DataProvider<T> {
Observable<List<T>> requestPage(int page);
}


private DataProvider provider;

@Before
public void setUp() throws Exception {
provider = Mockito.mock(DataProvider.class);
List<Object> list = Arrays.asList(new Object());
when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
}

@Test
public void shouldRequestOnlyFirstPageOnSubscribe() {
//given

TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);

//when
flightsObservable.subscribe(subscriber);

//then
subscriber.assertValueCount(1);
subscriber.assertNotCompleted();

verify(provider, times(1)).requestPage(0);
verify(provider, never()).requestPage(1);
}


@Test
public void shouldRequestNumberOfPagesSpecified() {
//given

int requested_pages = 5;
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
Observable<List<Object>> flightsObservable = create(provider);

//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(requested_pages);

//then
subscriber.assertValueCount(requested_pages);
subscriber.assertNotCompleted();


for (int i = 0; i < requested_pages; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(requested_pages);

}


@Test
public void shouldCompleteAfterRetrievingEmptyResult() {
//given

int emptyPage = 2;
when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));

TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
Observable<List<Object>> flightsObservable = create(provider);


//when
flightsObservable.subscribe(subscriber);

//then
subscriber.assertValueCount(emptyPage);
subscriber.assertCompleted();


verify(provider, times(1)).requestPage(0); //requested at subscribe
for (int i = 1; i <= emptyPage; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(emptyPage + 1);

}

@Test
public void shouldRequestNextPageWhenRequestedMore() {
//given

TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);

//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(1);

//then
subscriber.assertValueCount(2);
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);

//when
subscriber.requestMore(1);

//then
subscriber.assertValueCount(3);
subscriber.assertNotCompleted();

verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, times(1)).requestPage(2);
verify(provider, never()).requestPage(3);

}

@Test
public void shouldWorkWithMultipleSubscribers() {

//given

TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);

//when
flightsObservable.subscribe(subscriber1);
flightsObservable.subscribe(subscriber2);

//then
subscriber1.assertValueCount(1);
subscriber2.assertValueCount(1);

verify(provider, times(2)).requestPage(0);
verify(provider, never()).requestPage(1);

//when
subscriber1.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(1);

verify(provider, times(2)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);

//when
subscriber2.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(2);

verify(provider, times(2)).requestPage(0);
verify(provider, times(2)).requestPage(1);
verify(provider, never()).requestPage(2);
}

}

最佳答案

背压旨在协商并发消费者生产者行为,并允许程序作者设置策略来解决当数据生成速率超过数据消耗速率时要做什么。

也就是说,您会看到组合可观察量的运算符,例如 merge将为您提供与您所需的数据量不相符的请求量。合并时,外部 observable(Observables 中的 Observable)将始终收到 RxAndroid 上 16(RxJava 中 128)的请求。然后,当它接收到 List 的内部 Observables 时,每个内部 Observables 将收到一个基于下游订阅者请求量的请求。如果您尝试写 Observable<Observable<T>>你将被迫写一个 OnSubscribe<Observable<List<T>>>内部管理合并行为的函数,使其成为 Observable<List<T>>而不是Observable<Observable<List<T>> 。编写此代码将强制您订阅数据提供者返回的可观察对象以解包并 onNext List<T>

我建议您将屏幕 y 位置映射到页尾事件,然后使用扫描将其转换为单调递增的数字,然后将该数字 concatMap 到对 DataProvider.requestPage() 的调用中。 .

screenYPositions
.map(this::isUninitializedOrNearEndOfPage)
.scan(1, (event, pageNumber) -> pageNumber + 1 )
.concatMap(dataProvider::requestPage)
.subscribe(testSubscriber);

关于rx-java - RxJava 背压和对生产者的调用次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31967251/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com