gpt4 book ai didi

scala - Akka Stream - 根据流中的元素选择接收器

转载 作者:行者123 更新时间:2023-12-04 14:54:34 25 4
gpt4 key购买 nike

我正在使用 Akka 流创建一个简单的消息传递服务。该服务就像邮件传递一样,其中来自源的元素包括 destinationcontent喜欢:

case class Message(destination: String, content: String)

并且服务应该根据 destination 将消息传递到适当的接收器 field 。我创建了一个 DeliverySink类让它有一个名字:
case class DeliverySink(name: String, sink: Sink[String, Future[Done]])

现在,我实例化了两个 DeliverySink ,让我调用他们 sinkXsinkY ,并根据他们的名字创建了一张 map 。在实践中,我想提供一个接收器名称列表,该列表应该是可配置的。

我面临的挑战是如何根据 destination 动态选择合适的接收器 field 。

最后,我想映射 Flow[Message]到水槽。我试过:
val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] =
sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)

但是,显然这不起作用,因为我们无法引用 msg map 外...

我想这不是一个正确的方法。我也想过用 filterbroadcast ,但如果目标扩展到 100,我无法键入每个路由。实现我的目标的正确方法是什么?

[编辑]

理想情况下,我想让目的地充满活力。因此,我无法在过滤器或路由逻辑中静态键入所有目的地。如果目标接收器尚未连接,它也应动态创建一个新接收器。

最佳答案

如果您必须使用多个接收器

Sink.combine 将直接满足您现有的要求。如果您附上合适的 Flow.filter每次前 Sink那么他们只会收到适当的消息。

不要使用多个接收器

总的来说,我认为流的结构和内容包含业务逻辑是糟糕的设计。您的流应该是在普通 scala/java 代码中的业务逻辑之上的背压并发的薄饰面。

在这种特殊情况下,我认为最好将目标路由包装在单个 Sink 中,并且逻辑应该在单独的函数中实现。例如:

val routeMessage : (Message) => Unit = 
(message) =>
if(message.destination equalsIgnoreCase "stdout")
System.out println message.content
else if(message.destination equalsIgnoreCase "stderr")
System.err println message.content

val routeSink : Sink[Message, _] = Sink foreach routeMessage

注意现在测试我的 routeMessage 容易多了因为它不在流内:我不需要任何 akka testkit“东西”来测试 routeMessage。我也可以将函数移到 FutureThread如果我的并发设计要改变。

许多目的地

如果您有多个目的地,您可以使用 Map .例如,假设您将消息发送到 AmazonSQS。您可以定义一个函数将队列名称转换为队列 URL,并使用该函数来维护已创建名称的 Map:
type QueueName = String

val nameToRequest : (QueueName) => CreateQueueRequest = ??? //implementation unimportant

type QueueURL = String

val nameToURL : (AmazonSQS) => (QueueName) => QueueURL = {
val nameToURL = mutable.Map.empty[QueueName, QueueURL]

(sqs) => (queueName) => nameToURL.get(queueName) match {
case Some(url) => url
case None => {
sqs.createQueue(nameToRequest(queueName))
val url = sqs.getQueueUrl(queueName).getQueueUrl()

nameToURL put (queueName, url)

url
}
}
}

现在您可以在单个 Sink 中使用这个非流函数:
val sendMessage : (AmazonSQS) => (Message) => Unit = 
(sqs) => (message) =>
sqs sendMessage {
(new SendMessageRequest())
.withQueueUrl(nameToURL(sqs)(message.destination))
.withMessageBody(message.content)
}

val sqs : AmazonSQS = ???

val messageSink = Sink foreach sendMessage(sqs)

旁注

对于 destination你可能想使用除 String 以外的其他东西. A coproduct通常更好,因为它们可以与 case 语句一起使用,如果您错过其中一种可能性,您将得到有用的编译器错误:
sealed trait Destination

object Out extends Destination
object Err extends Destination
object SomethingElse extends Destination

case class Message(destination: Destination, content: String)

//This function won't compile because SomethingElse doesn't have a case
val routeMessage : (Message) => Unit =
(message) => message.destination match {
case Out =>
System.out.println(message.content)
case Err =>
System.err.println(message.content)
}

关于scala - Akka Stream - 根据流中的元素选择接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48758415/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com