gpt4 book ai didi

kotlin - 如何合并2个单独的流,如何从中缓冲填充的数据并在一段时间后进行订阅

转载 作者:行者123 更新时间:2023-12-02 13:39:03 25 4
gpt4 key购买 nike

我正在尝试测试这种情况:

我有2个类,它们只是从同一个父类扩展而来。

我正在从每个类的项目列表中创建和Observables:

val listSomeClass1 = ArrayList<SomeClass1>()
val listSomeClass2 = ArrayList<SomeClass2>()

fun populateJust1() {
listSomeClass1.add(SomeClass1("23", 23))
listSomeClass1.add(SomeClass1("24", 24))
listSomeClass1.add(SomeClass1("25", 25))
}

fun populateJust2() {
listSomeClass2.add(SomeClass2(23.00))
listSomeClass2.add(SomeClass2(24.00))
listSomeClass2.add(SomeClass2(25.00))
}

populateItemsSomeClass1()
populateItemsSomeClass2()

现在我可以创建2个可观察对象:
  val someClass1Observable = Observable.fromIterable(listSomeClass1)
val someClass2Observable = Observable.fromIterable(listSomeClass2)

在这里,我想合并它们的发射,对其进行缓冲,并在10秒后订阅它:
      Observable.merge(someClass1Observable, someClass2Observable)
.buffer(10, TimeUnit.SECONDS)
.doOnSubscribe { Log.v("parentObservable", "STARTED") }
.subscribe { t: MutableList<Parent> ->
Log.v("parentObservable", "onNext")
t.forEach { Log.v("onNext", it.toString()) }
}

但是,可观察对象并没有像我预期的那样在10秒后开始,只是立即准备好此数据。
如何模拟这样的东西,我将收集2个独立的流,并且10秒钟后我将能够获取收集的数据

我必须指出,我不想使用任何主题。

更新

我做了这样的事:
  val list1 = listOf(SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3))
val list2 = listOf(SomeClass2(5.00), SomeClass2(4.00), SomeClass2(6.00))

val someClass1Observable = Observable
.fromIterable(list1)
.zipWith(Observable.interval(2, TimeUnit.SECONDS),
BiFunction { item: SomeClass1, _: Long -> item })



val someClass2Observable = Observable
.fromIterable(list2)
.zipWith(Observable.interval(1, TimeUnit.SECONDS),
BiFunction { item: SomeClass2, _: Long -> item })



someClass1Observable.subscribe {
Log.v("someClass1", it.toString())
}

someClass2Observable.subscribe {
Log.v("someClass2", it.toString())
}


Observable.merge(someClass1Observable, someClass2Observable)
.buffer(10, TimeUnit.SECONDS)
.delay(10, TimeUnit.SECONDS)
.doOnSubscribe { Log.v("parentObservable", "STARTED") }
.subscribe { t: MutableList<Parent> ->
Log.v("parentObservable", "onNext")
t.forEach { Log.v("onNext", it.toString()) }
}

Thread.sleep(13000)

someClass1Observable.subscribe {
Log.v("someClass1", it.toString())
}

someClass2Observable.subscribe {
Log.v("someClass2", it.toString())
}

在这里,我只想模拟someClass1和someclass2 Observable的2个无限流,并为合并Observable进行模拟。

同样,我希望能够合并这2个流,缓冲填充的数据并在10秒后对其进行处理。如果10秒之后,这2个流将再次填充一些数据,则合并Observable应该清除先前的缓冲区,并应再次缓冲新数据并在10秒后发出,依此类推,无限。但是,我的代码无法按我预期的那样工作,我需要做哪些更改才能使其如我所描述的那样?

最佳答案

我认为您正在寻找delay运算符

http://reactivex.io/documentation/operators/delay.html

Delay shift the emissions from an Observable forward in time by a particular amount



所以像:
.delay(10, TimeUnit.SECONDS)

关于kotlin - 如何合并2个单独的流,如何从中缓冲填充的数据并在一段时间后进行订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47359583/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com