gpt4 book ai didi

android - 在 Android 中使用 RxJava 窗口或缓冲区进行批处理?

转载 作者:行者123 更新时间:2023-12-02 10:52:33 24 4
gpt4 key购买 nike

我希望在 api post 之前实现一个批处理机制,以进行一些简单的事件收集和日志记录。由于这是Android,我还想在该服务停止时处理生命周期事件,那么如果服务停止但计数或时间尚未达到,如何手动刷新缓冲窗口。

例如,我有一个 PublishSubject(主题),创建一个 flowable 并对其执行窗口操作,如下所示:

subject.toFlowable(BackpressureStrategy.BUFFER)
.window(30,
TimeUnit.SECONDS,
20,
true)
.flatMapSingle { it.toList() }
.subscribe (this::send)

如果我的服务/应用程序暂停或终止,我只想发送缓冲区中的内容。

最佳答案

您面临的问题是在必要时停止观察并刷新窗口中的当前项目。 Flowable.window() 运算符的文档这样说:

When the source Publisher completes or encounters an error, the resulting Publisher emits the current window and propagates the notification from the source Publisher.

因此,您需要使您的 Subject 发出错误或完成。在大多数情况下,这不是处理对象的正确方法。让我们将 Subject 替换为可以轻松完成的内容:

private val stopObserver = BehaviorSubject.create<Unit>() // (1)

private fun emitStop() { // (2)
stopObserver.onNext(Unit)
}

private fun sourceSubject(): Flowable<Long> { // (3)
return Flowable.interval(1, TimeUnit.SECONDS)
.takeUntil(stopObserver.toFlowable(BackpressureStrategy.BUFFER)) // (4)
}

private fun runObservation() { // (5)
sourceSubject()
.window(10)
.flatMapSingle { it.toList() }
.doOnNext { Log.d("onNext", "${it.count()} items") }
.subscribe()
}

重要部分说明:

  1. 创建新的 Subject,每次您意识到应用停止或暂停时都会发出该主题。
  2. 您只需在需要时使用函数 emitStop()Subject 发出 onNext 事件
  3. sourceSubject() 函数模仿您的源 Subject。这个每秒都会发出一个项目。
  4. 当传递的 Publisher ( stopObserver) 发出一个项目时,
  5. takeUntil() 运算符完成流。这确保了我们的整个source Publisher (sourceSubject) 完成。
  6. 我使用了 window() 运算符的简单版本,但它们都使用与源发布者相同的原则。

可能的输出:

2019-11-30 10:48:54.527 D/onNext: 10 items
2019-11-30 10:49:04.524 D/onNext: 10 items
2019-11-30 10:49:14.525 D/onNext: 10 items
2019-11-30 10:49:19.056 D/onNext: 4 items

关于android - 在 Android 中使用 RxJava 窗口或缓冲区进行批处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51047960/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com