gpt4 book ai didi

scala - 如何将 RX Observable 转换为 Play Enumerator

转载 作者:行者123 更新时间:2023-12-02 01:50:27 24 4
gpt4 key购买 nike

我成功地在 Play 中使用其原生枚举器构造设置了一个 websocket,调用了一些返回字符串的代码:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
val in = Iteratee.ignore[String]
val out = Enumerator.repeatM {
Promise.timeout(operation, 3 seconds)
}
(in, out)
}

现在我希望我的 operation 函数返回一个 rx.lang.scala.Observable[String] 而不是一个字符串,我想尽快输出任何字符串当它进入时。如何将此 Observable 映射到 play.api.libs.iteratee.Enumerator

最佳答案

您可以使用来自 Bryan Gilbert 的隐式转换。这将工作得很好,但要小心使用 updated version of Bryan Gilbert's conversions ! Jeroen Kransen 的回答中从未调用取消订阅(这很糟糕!)。

  /*
* Observable to Enumerator
*/
implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
// unicast create a channel where you can push data and returns an Enumerator
Concurrent.unicast { channel =>
val subscription = obs.subscribe(new ChannelObserver(channel))
val onComplete = { () => subscription.unsubscribe }
val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
(onComplete, onError)
}
}

class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
override def onNext(elem: T): Unit = channel.push(elem)
override def onCompleted(): Unit = channel.end()
override def onError(e: Throwable): Unit = channel.end(e)
}

为了完整起见,这里是从 Enumerator 到 Observable 的转换:

  /*
* Enumerator to Observable
*/
implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
// creating the Observable that we return
Observable({ observer: Observer[T] =>
// keeping a way to unsubscribe from the observable
var cancelled = false

// enumerator input is tested with this predicate
// once cancelled is set to true, the enumerator will stop producing data
val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

// applying iteratee on producer, passing data to the observable
cancellableEnum (
Iteratee.foreach(observer.onNext(_))
).onComplete { // passing completion or error to the observable
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}

// unsubscription will change the var to stop the enumerator above via the breakE function
new Subscription { override def unsubscribe() = { cancelled = true } }
})
}

Play 中 WebSockets 的 Rx

另一方面,您可能会说,在 Play 中处理 Iteratees 和 Enumerator 的大部分时间是在使用 WebSockets 时(就像您在这里所做的那样)。我们都同意 Iteratees 确实不如 Observables 直观,这可能就是您在 Play 项目中使用 Rx 的原因。

根据那个观察,我构建了一个名为 WidgetManager 的库,它正是这样做的:在 Play 中集成 Rx,摆脱了 Iteratees 操作。

使用该库,您的代码可以简单地是:

def operationStatusFeed = WebSocket.using[String] { implicit request =>

// you can optionally give a function to process data from the client (processClientData)
// and a function to execute when connection is closed (onClientClose)
val w = new WidgetManager()

w.addObservable("op", operation)

// subscribe to it and push data in the socket to the client (automatic JS callback called)
w.subscribePush("op")

// deals with Iteratees and Enumerators for you and returns what's needed
w.webSocket
}

库位于 GitHub 上: RxPlay (欢迎投稿)

关于scala - 如何将 RX Observable 转换为 Play Enumerator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23043539/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com