gpt4 book ai didi

akka - 创建用于并行处理集合元素的 Akka 流

转载 作者:行者123 更新时间:2023-12-05 01:24:37 26 4
gpt4 key购买 nike

我正在尝试为包含并行处理流的 Akka 流定义一个图(我正在使用 Akka.NET,但这应该无关紧要)。想象一个订单的数据源,每个订单由一个订单 ID 和一个产品列表(订单商品)组成。工作流程如下:

  1. 接收和订购
  2. 将订单广播给两个流,流A处理订单项, channel B处理订单ID(一些簿记工作)
  3. 流程 A:将订单项的集合拆分为单独的元素,每个元素单独处理
  4. 流程 A:对于上一步拆分后产生的每个订单项目,调用一些外部服务来查找额外信息(价格、可用性等)
  5. 流程 B:为给定的订单 ID 做一些额外的簿记
  6. 合并流程 A 和 B
  7. 将上一步的合并数据发送到接收器,从而丰富订单信息

步骤 1 (Source.From)、2 (Broadcast)、4-5 (Map)、6 (Merge)、7 (Sink) 看起来不错。但是如何在 Akka 或 react 流术语中实现集合拆分?这不是广播或扁平化,N 个元素的集合需要拆分成 N 个独立的子流,这些子流稍后将被合并回来。这是如何实现的?

最佳答案

我建议在一个流程中完成。我知道两个流程看起来更酷,但相信我,就设计的简单性而言,这不值得(我试过)。你可以这样写

import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}

import scala.collection.immutable
import scala.concurrent.Future

case class Item()

case class Order(items: List[Item])

val flow = Flow[Order]
.mapAsync(4) { order =>
Future {
// Enrich your order here
order
}
}
.mapConcat { order =>
order.items.map(order -> _)
}
.mapAsync(4) { case (order, item) =>
Future {
// Enrich your item here
order -> item
}
}
.groupBy(2, tuple => tuple._1)
.fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
.mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }

但即使是这种方法也很糟糕。上面的代码或您的设计可能会出错的地方太多了。如果订单项目之一的充实失败,您会怎么做?如果订单对象的充实失败怎么办?您的流应该如何处理?

如果我是你,我会使用 Flow[Order] 并在 mapAsync 中处理它的子项,这样至少它可以保证我没有部分处理的订单。

关于akka - 创建用于并行处理集合元素的 Akka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40117489/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com