- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我真的很喜欢 RxJava,它是一个很棒的工具,但有时很难理解它是如何工作的。我们在我们的 Android 项目中使用带有 RxJava 的 Retrofit,并且有以下用例:
我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作。服务器完成后,我必须交付结果。所以我已经用 RxJava 成功地完成了,这里是代码 fragment :我将“skipWhile”与“repeatWhen”一起使用
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
代码运行良好:
当服务器响应作业正在处理时,我从“skipWhile”链返回“true”,原始 Observable 等待 1 秒并再次执行 http 请求。重复此过程,直到我从“skipWhile”链返回“false”。
这里有一些我不明白的地方:
我在“skipWhile”的文档中看到它不会从原始 Observable 发出任何东西(onError、onNext、onComplete),直到我从它的“call”方法返回“false”。那么,如果它不发出任何东西,为什么“repeatWhen”Observable 会执行它的工作呢?它等待一秒钟并再次运行请求。谁发起的?
第二个问题是:为什么“repeatWhen”的 Observable 不会永远运行,我的意思是为什么当我从“skipWhile”返回“false”时它停止重复?如果我返回“false”,我会在我的订阅者中成功进入 onNext。
在“repeatWhile”的文档中说,最终我在我的订阅者中收到了对“onComplete”的调用,但从未调用过“onComplete”。
即使我更改链接“skipWhile”和“repeatWhen”的顺序也没有区别。这是为什么?
我知道 RxJava 是开源的,我只能阅读代码,但正如我所说 - 它真的很难理解。
谢谢。
最佳答案
我以前没有使用过repeatWhen
,但这个问题让我很好奇,所以我做了一些研究。
skipWhile
确实发出onError
和onCompleted
,即使它从不返回true
在那之前。因此,每次 checkJob()
发出 onCompleted
时都会调用 repeatWhen
。这回答了问题 #1。
其余问题基于错误的假设。您的订阅将永远运行,因为您的 repeatWhen
永远不会终止。那是因为 repeatWhen
比您意识到的要复杂得多。每当从源中获取 onCompleted
时,其中的 Observable
就会发出 null
。如果你接受它并返回 onCompleted
然后它结束,否则如果你发出任何东西它会重试。由于 delay
只是接受发射并延迟它,所以它总是再次发射 null
。因此,它会不断重新订阅。
#2 的答案是它永远运行;您可能正在执行此代码之外的其他操作来取消订阅。对于 #3,您永远不会得到 onCompleted
,因为它永远不会完成。对于 #4,顺序无关紧要,因为您要无限期地重复。
现在的问题是,如何获得正确的行为?就像使用 takeUntil
而不是 skipWhile
一样简单。这样,您将一直重复直到得到您想要的结果,从而在您希望它结束时终止流。
这是一个代码示例:
Observable<Boolean> source = ...; // Something that eventually emits true
source
.repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
.takeUntil(result -> result)
.filter(result -> result)
.subscribe(
res -> System.out.println("onNext(" + res + ")"),
err -> System.out.println("onError()"),
() -> System.out.println("onCompleted()")
);
在此示例中,source
发出 bool 值。我每 1 秒重复一次,直到源发出 true
。我一直服用直到 result
为 true
。我过滤掉所有 false
的通知,因此订阅者在它为 true
之前不会收到它们。
关于android - 在RxJava中使用 "skipWhile"结合 "repeatWhen"实现服务器轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34943734/
我有 C# 代码 string fileNameOnly = Path.GetFileNameWithoutExtension(sKey); string token = fileNameOnly.R
我有一个 IEnumerable 值,我需要在开始时跳过某些元素,为此我使用 SkipWhile 。但是,我肯定需要至少一个元素(假设序列甚至包含至少一个元素)。如果所有元素都通过谓词(即跳过所有元素
在 F# 中,我发现什么时候想使用 takeWhile ,我平时也想用skipWhile ,即取满足某个谓词的列表前缀,同时还要记住列表的其余部分,以便后续处理。我不认为有一个标准库函数可以做到这两者
skipWhile 和过滤运算符有什么区别? const source = interval(1000); const example = source.pipe(skipWhile(val => v
在第一个例子中,我的终端输出 next(1) next(2) next(3) next(4) next(5) next(6) next(7) next(8) next(9) next(10) comp
我有点惊讶地发现以下代码的结果,我只是想从一个整数序列中删除所有 3: var sequence = new [] { 1, 1, 2, 3 }; var result = sequence.Skip
我有以下按描述日期排序的行: var query = this.DbContext.Items.OrderByDescending(g => g.UpdatedAt); 0 Dog 201
我有以下用例: 有一项服务可以将具有特定 ID 的计算任务排队。我连接了一个 observable 来查询服务的状态。只要 ID 在队列中,查询就返回 true(尚未计算)。一旦查询为false,则表
我有一个 RekenReeks 类,它返回从 2 开始乘以 2 的数字。所以 {2,4,8,16,32,64} 现在我了解了 TakeWhile 和 SkipWhile 方法以及 LINQ。 所以我创
我有一个项目列表: List ItemList = new List; 有时列表只是部分填满或某些索引未被占用,因此当我使用 foreach 遍历列表时,它会给出错误,因为该项目为空。我想将该列表缩减
我正在尝试练习下面的示例,但是 SkipWhile() 操作得到了奇怪的输出,它没有显示预期的输出。有人能解释一下为什么吗? List emp = new List();
我看到了一些我很难理解的行为。给定一个 DateTime 列表,我只想按降序选择去年发生的日期。但是使用 OrderBy()、OrderByDescending() 和 SkipWhile() 的行为
我真的很喜欢 RxJava,它是一个很棒的工具,但有时很难理解它是如何工作的。我们在我们的 Android 项目中使用带有 RxJava 的 Retrofit,并且有以下用例: 我需要轮询服务器,重试
我需要按发布日期降序排列存储在数据库中的文章,然后使用 Id == 100 获取文章之后的前 20 条记录。 这就是我想用 Linq 做的事情: IQueryable articles = d
我找不到为什么会发生以下异常。非常感谢任何帮助。 // EdcsEntities is derived from System.Data.Objects.ObjectContext EdcsEntit
是否有任何特殊原因需要单独的方法 Skip和 SkipWhile ,而不是简单地重载相同的方法? 我的意思是,而不是Skip(int) , SkipWhile(Func) , 和 SkipWhile(
我是一名优秀的程序员,十分优秀!