- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想为 monix.reactive.Observable
编写一个拆分函数.它应该拆分一个源 Observable[A]
变成一对新的 (Observable[A], Observable[A])
,基于谓词的值,针对源中的每个元素进行评估。我希望拆分独立于源 Observable 是热的还是冷的。在源是冷的情况下,新的 Observable 对也应该是冷的,而在源是热的情况下,新的 Observable 对将是热的。我想知道这样的实现是否可行,如果可以,如何实现(我在下面粘贴了一个失败的测试用例)。
签名,作为隐式类上的方法,看起来像或类似于
/**
* Split an observable by a predicate, placing values for which the predicate returns true
* to the right (and values for which the predicate returns false to the left).
* This is consistent with the convention adopted by Either.cond.
*/
def split(p: T => Boolean)(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[T], Observable[T]) = {
splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
}
PublishSubject
.因此,这对新的 Observable 很热门。我对冷 Observable 的测试失败了。
import monix.eval.TaskLike
import monix.execution.{Ack, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.subjects.PublishSubject
import scala.concurrent.Future
object ObservableOps {
implicit class ObservableExtensions[T](o: Observable[T]) {
/**
* Split an observable by a predicate, placing values for which the predicate returns true
* to the right (and values for which the predicate returns false to the left).
* This is consistent with the convention adopted by Either.cond.
*/
def split(p: T => Boolean)(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[T], Observable[T]) = {
splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
}
/**
* Split an observable into a pair of Observables, one left, one right, according
* to a determinant function.
*/
def splitEither[U, V](f: T => Either[U, V])(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[U], Observable[V]) = {
val l = PublishSubject[U]()
val r = PublishSubject[V]()
o.subscribe(new Observer[T] {
override def onNext(elem: T): Future[Ack] = {
f(elem) match {
case Left(u) => l.onNext(u)
case Right(v) => r.onNext(v)
}
}
override def onError(ex: Throwable): Unit = {
l.onError(ex)
r.onError(ex)
}
override def onComplete(): Unit = {
l.onComplete()
r.onComplete()
}
})
(l, r)
}
}
}
//////////
import ObservableOps._
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import monix.reactive.subjects.PublishSubject
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures._
class ObservableOpsSpec extends FlatSpec {
val isEven: Int => Boolean = _ % 2 == 0
"Observable Ops" should "split a cold observable" in {
val o = Observable(1, 2, 3, 4, 5)
val (l, r) = o.split(isEven)
l.toListL.runToFuture.futureValue shouldBe List(1, 3, 5)
r.toListL.runToFuture.futureValue shouldBe List(2, 4)
}
"Observable Ops" should "split a hot observable" in {
val o = PublishSubject[Int]()
val (l, r) = o.split(isEven)
val lbuf = l.toListL.runToFuture
val rbuf = r.toListL.runToFuture
Observable.fromIterable(1 to 5).mapEvalF(i => o.onNext(i)).subscribe()
o.onComplete()
lbuf.futureValue shouldBe List(1, 3, 5)
rbuf.futureValue shouldBe List(2, 4)
}
}
"Observable Ops" should "split a cold observable"
正在失败。
import monix.execution.Scheduler
import monix.reactive.Observable
object ObservableOps {
implicit class ObservableExtension[T](o: Observable[T]) {
/**
* Split an observable by a predicate, placing values for which the predicate returns true
* to the right (and values for which the predicate returns false to the left).
* This is consistent with the convention adopted by Either.cond.
*/
def split(
p: T => Boolean
)(implicit scheduler: Scheduler): (Observable[T], Observable[T]) = {
splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
}
/**
* Split an observable into a pair of Observables, one left, one right, according
* to a determinant function.
*/
def splitEither[U, V](
f: T => Either[U, V]
)(implicit scheduler: Scheduler): (Observable[U], Observable[V]) = {
val oo = o.map(f)
val l = oo.collect {
case Left(u) => u
}
val r = oo.collect {
case Right(v) => v
}
(l, r)
}
}
}
最佳答案
class ObservableOpsSpec extends FlatSpec {
val isEven: Int => Boolean = _ % 2 == 0
"Observable Ops" should "split a cold observable" in {
val o = Observable(1, 2, 3, 4, 5)
val o2 = o.publish
val (l, r) = o2.split(isEven)
val x= l.toListL.runToFuture
val y = r.toListL.runToFuture
o2.connect()
x.futureValue shouldBe List(1, 3, 5)
y.futureValue shouldBe List(2, 4)
}
"Observable Ops" should "split a hot observable" in {
val o = PublishSubject[Int]()
val (l, r) = o.split(isEven)
val lbuf = l.toListL.runToFuture
val rbuf = r.toListL.runToFuture
Observable.fromIterable(1 to 5).mapEvalF(i => o.onNext(i)).subscribe()
o.onComplete()
lbuf.futureValue shouldBe List(1, 3, 5)
rbuf.futureValue shouldBe List(2, 4)
}
}
关于scala - 拆分 Monix Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57909519/
我想为 monix.reactive.Observable 编写一个拆分函数.它应该拆分一个源 Observable[A]变成一对新的 (Observable[A], Observable[A]) ,
我正在尝试理解 Monix 中的任务调度原则。以下代码(来源:https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3)按预期仅生成
我正在尝试使用 monix 来并行化某些操作,然后执行错误处理 假设我正在尝试解析和验证这样的几个对象 def parseAndValidateX(x: X) Task[X] 和 def parseA
我正在使用 monix 任务,我正在 try catch Throwable,然后将其转换为自定义错误。我已删除/更改代码以使其简单且相关。这是代码(问题跟在代码片段之后): import io.ne
我正在使用 monix 任务,我正在 try catch Throwable,然后将其转换为自定义错误。我已删除/更改代码以使其简单且相关。这是代码(问题跟在代码片段之后): import io.ne
我正在尝试使用 monix 3.0.0-RC1 构建响应式应用程序。 比如a有一个Int的Seq,第二个元素是错误的。我可以使用 Oservable.raiseError(...) 来处理这个问题:
我尝试执行拆分单 Observable在 Monix 中按键,然后分组到最后 n每个 GrouppedObservable 中的事件并将其发送以进行进一步处理。问题是要分组的键的数量可能是无限的,这会
Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source 的背压. 请参阅此测试代码: import java
我有一个分页资源,我想用 Monix 递归地使用它。我想要一个 Observable,它将发出下载的元素并递归地使用页面。这是一个简单的例子。它当然不起作用。它发出第一页,然后是第一页 + 第二页,然
我目前正在致力于实现对 API 的客户端 http 请求,并决定为此任务探索 sttp 和 monix。由于我是 Monix 的新手,我仍然不确定如何运行任务并检索它们的结果。我的目标是获得一系列 h
我是一名优秀的程序员,十分优秀!