gpt4 book ai didi

scala,将回调模式转换为函数式风格的内部迭代器

转载 作者:行者123 更新时间:2023-12-01 05:21:31 25 4
gpt4 key购买 nike

假设给出了这个 API 并且我们不能改变它:

object ProviderAPI {

trait Receiver[T] {
def receive(entry: T)
def close()
}

def run(r: Receiver[Int]) {
new Thread() {
override def run() {
(0 to 9).foreach { i =>
r.receive(i)
Thread.sleep(100)
}
r.close()
}
}.start()
}
}

在本例中, ProviderAPI.run需要一个 Receiver , 调用 receive(i) 10 次然后关闭。通常, ProviderAPI.run将调用 receive(i)基于一个可能是无限的集合。

此 API 旨在以命令式风格使用,例如外部迭代器。如果我们的应用程序需要过滤、映射和打印这个输入,我们需要实现一个混合所有这些操作的接收器:
object Main extends App {
class MyReceiver extends ProviderAPI.Receiver[Int] {
def receive(entry: Int) {
if (entry % 2 == 0) {
println("Entry#" + entry)
}
}
def close() {}
}

ProviderAPI.run(new MyReceiver())
}

现在,问题是如何使用函数式风格的 ProviderAPI,内部迭代器(不改变提供给我们的 ProviderAPI 的实现)。请注意,ProviderAPI 也可以调用 receive(i)无限次,因此不能将所有内容都收集到列表中(此外,我们应该一个一个地处理每个结果,而不是先收集所有输入,然后再处理它)。

我在问如何实现这样的 ReceiverToIterator ,这样我们就可以以函数式的方式使用 ProviderAPI:
object Main extends App {
val iterator = new ReceiverToIterator[Int] // how to implement this?
ProviderAPI.run(iterator)
iterator
.view
.filter(_ % 2 == 0)
.map("Entry#" + _)
.foreach(println)
}

更新

以下是四种解决方案:
  • IteratorWithSemaphorSolution :我首先提出的解决方法附在问题
  • QueueIteratorSolution :使用 BlockingQueue[Option[T]]根据nadavwr的建议。
    它允许生产者继续生产直到 queueCapacity在被消费者阻止之前。
  • PublishSubjectSolution :非常简单的解决方案,使用 PublishSubject来自 Netflix RxJava-Scala API。
  • SameThreadReceiverToTraversable :非常简单的解决方案,通过放宽问题的约束
  • 最佳答案

    更新:1 个条目的 BlockingQueue

    您在这里实现的本质上是 Java 的 BlockingQueue,队列大小为 1。

    主要特点: super 阻塞。缓慢的消费者会扼杀生产者的表现。

    更新: @gzm0 提到 BlockingQueue 不包括 EOF。为此,您必须使用 BlockingQueue[Option[T]]。

    更新:这是一个代码片段。它可以制作成适合您的 Receiver .
    其中一些灵感来自 Iterator.buffered .请注意 peek是一个误导性的名称,因为它可能会阻塞——hasNext 也会阻塞.

    // fairness enabled -- you probably want to preserve order...
    // alternatively, disable fairness and increase buffer to be 'big enough'
    private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

    // the following block provides you with a potentially blocking peek operation
    // it should `queue.take` when the previous peeked head has been invalidated
    // specifically, it will `queue.take` and block when the queue is empty
    private var head: Option[T] = _
    private var headDefined: Boolean = false
    private def invalidateHead() { headDefined = false }
    private def peek: Option[T] = {
    if (!headDefined) {
    head = queue.take()
    headDefined = true
    }
    head
    }

    def iterator = new Iterator[T] {

    // potentially blocking; only false upon taking `None`
    def hasNext = peek.isDefined

    // peeks and invalidates head; throws NoSuchElementException as appropriate
    def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
    }
    }

    替代方案:迭代器

    基于迭代器的解决方案通常会涉及更多的阻塞。从概念上讲,您可以在执行迭代的线程上使用延续来避免阻塞线程,但是延续与 Scala 的 for-comprehensions 困惑,所以在这条路上没有乐趣。

    或者,您可以考虑基于迭代的解决方案。迭代器与迭代器的不同之处在于消费者不负责推进迭代——生产者负责。对于迭代器,消费者基本上会随着时间的推移折叠生产者推送的条目。可以在线程池中折叠每个可用的下一个条目,因为在每个折叠完成后线程会被放弃。

    你不会得到很好的 for 语法迭代,学习曲线有点挑战,但如果你有信心使用 foldLeft你最终会得到一个看起来很合理的非阻塞解决方案。

    要阅读有关迭代器的更多信息,我建议查看 PlayFramework 2.X's iteratee reference .该文档描述了他们的独立 iteratee 库,该库在 Play 上下文之外 100% 可用。 Scalaz 7 也有一个全面的 iteratee 库。

    关于scala,将回调模式转换为函数式风格的内部迭代器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16324429/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com