["Ter-6ren">
gpt4 book ai didi

scala - 使用 Scala Actors 来创建像流水线一样的东西

转载 作者:行者123 更新时间:2023-12-04 15:28:06 26 4
gpt4 key购买 nike

我现在已经为以下问题苦苦挣扎了一个星期,需要一些建议。

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

我想构建一个管道,如:
query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate (collected-meta-infos-state per query)
=> List[ TerminatorI-List[MetaInfo], TerminatorII-List[MetaInfo], ...]

到目前为止,我已经将每个 Pipeline-Segment 实现为 Actor。
我需要为每个查询创建专用的参与者实例,因为其中一些参与者(如 filterXXX 和 consolidate 需要维护每个查询的状态)。

像 askIMDB 这样的函数会产生多个结果,我想同时处理这些结果(每个结果都给一个单独的参与者)。所以我没有找到任何方法在执行 query() 之前预先构建整个 Actor 图,也没有找到一种在运行时修改它的优雅方法。

我的第一次尝试是使用一系列 actor 并在消息中传递诸如 Transaction-ID 之类的东西,因此每个 Actor 都有一个 Map[TransactionID->State] 但这感觉相当丑陋。
第二个尝试是创建一种管道,将参与者的有向图抽象为一个流,但到目前为止我失败了。

这是我的第一篇文章,对不起,如果我忘记了什么或者问题是通用/伪编码。非常感谢任何建议。谢谢!

最佳答案

建议你看看ScalaQuery ,它做同样的事情。它可以这样做,因为这是一个 monad 问题。实际上,一些 Haskell 解决方案如 Arrows,是由 Scalaz library 实现的。 ,似乎很接近。

那将是最好的解决方案,因为正确的抽象将使将来的更改更容易。

作为一个黑客,我认为是这样的:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
self =>

// Create actors
def createActor(qm: QueryModifiers): Actor = {
val actor = qm match {
case Consolidate => // create a consolidator actor
case //... as needed
}
actor.start
actor
}

// The pipeline
val pipe: List[List[QueryModifiers]] = Nil

// Build the pipeline
def ->(qms: List[QueryModifiers]) = new Query(title) {
override val pipe = qms :: self.pipe
}
def ->(qm: QueryModifiers) = new Query(title) {
override val pipe = List(qm) :: self.pipe
}
def ->(c: Consolidate.type) = {
// Define the full pipeline
// Because the way pipe is built, the last layer comes first, and the first comes last
val pipeline = Consolidate :: pipe

// Create an actor for every QueryModifier, using an unspecified createActor function
val actors = pipeline map (_ map (createActor(_))

// We have a list of lists of actors now, where the first element of the list
// was the last QueryModifiers we received; so, group the layers by two, and for each
// pair, make the second element send the result to the first.
// Since each layer can contain many actors, make each member of the second
// layer send the results to each member of the first layer.
// The actors should be expecting to receive message SendResultsTo at any time.
for {
List(nextLayer, previousLayer) <- actors.iterator sliding 2
nextActor <- nextLayer
previousActor <- previousLayer
} previousActor ! SendResultsTo(nextActor)

// Send the query to the first layer
for ( firstActor <- actors.last ) firstActor ! Query(title)

// Get the result from the last layer, which is the consolidator
val results = actors.head.head !? Results

// Return the results
results
}
}

编辑

您也可以通过一些技巧来保证订购。我试图在这里避免使用 Scala 2.8,尽管它可以通过命名和默认参数使这变得更容易。
sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
}
// Do similarly for qm: NextQM

// Consolidation
def ->(qm: Consolidate.type) = {
// Create Searchers actors
// Send them the Filters
// Send them Fetchers
// Create the Consolidator actor
// Send it to Searchers actors
// Send Searchers the query
// Ask Consolidator for answer
}
}

object Query {
def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

现在,搜索者参与者保留了一个过滤器列表、一个提取器列表和对整合器的引用。他们听取通知他们这些事情的消息,并进行查询。对于每个结果,他们为列表中的每个过滤器创建一个过滤器 actor,向每个过滤器发送 getter 和合并器列表,然后将结果发送给它们。

过滤器参与者保持一个 getter 列表和对整合器的引用。他们听取通知他们这些事情的消息,以及搜索者的结果。他们将他们的输出(如果有)发送给新创建的 fetcher actor,后者首先被告知整合者。

提取器保留对合并器的引用。他们收听通知他们该引用的消息,以及来自过滤器的结果。他们依次将结果发送给整合商。

合并器会收听两条消息。来自 fetcher actor 的一条消息通知他们他们积累的结果。来自 Query 的另一条消息要求返回该结果。

剩下的唯一事情就是设计一种方法让整合商知道所有结果都已处理。一种方法如下:
  • 在 Query 中,将创建的每个 Searcher 通知 Consolidator 参与者。集运商保留一份 list ,并带有一个标志,表明它们是否已完成。
  • 每个搜索者都会保存一个它创建的过滤器列表,并等待来自它们的“完成”消息。当搜索者没有剩余的处理要做并且从所有过滤器接收到“完成”时,它会向合并器发送一条消息,通知它已经完成。
  • 反过来,每个过滤器都会保存一个它创建的提取器列表,同样地,等待来自它们的“完成”消息。当它完成处理并从所有 getter 接收到“完成”时,它会通知搜索器它已经完成。
  • 当它的工作完成并发送到合并器时,它 getter 向创建它的过滤器发送一个“完成”消息。
  • 合并器仅在收到所有搜索者的“完成”后才监听查询结果的消息。
  • 关于scala - 使用 Scala Actors 来创建像流水线一样的东西,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1924092/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com