- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
示例场景:将流的字节分组为大小由另一个流(整数)确定的 block 。
def partition[A, B, C](
first:Source[A, NotUsed],
second:Source[B, NotUsed],
aggregate:(Int => Seq[A], B) => C
):Source[C, NotUsed] = ???
val bytes:Source[Byte, NotUsed] = ???
val sizes:Source[Int, NotUsed] = ???
val chunks:Source[ByteString, NotUsed] =
partition(bytes, sizes, (grab, count) => ByteString(grab(count)))
我最初的尝试包括 Flow#scan 的组合和 Flow#prefixAndTail ,但感觉不太对(见下文)。我也看了Framing ,但它似乎不适用于上面的示例场景(也不够通用以适应非字节串流)。我猜我唯一的选择是使用 Graphs (或更一般的 FlowOps#transform ),但我还不够精通 Akka 流来尝试这样做。
这是我到目前为止能够想到的(特定于示例场景):
val chunks:Source[ByteString, NotUsed] = sizes
.scan(bytes prefixAndTail 0) {
(grouped, count) => grouped flatMapConcat {
case (chunk, remainder) => remainder prefixAndTail count
}
}
.flatMapConcat(identity)
.collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) }
最佳答案
我认为您可以将处理实现为自定义 GraphStage
。舞台将有两个 Inlet
元素。一个获取字节,另一个获取大小。它将有一个 Outlet
元素产生值。
考虑以下输入流。
def randomChars = Iterator.continually(Random.nextPrintableChar())
def randomNumbers = Iterator.continually(math.abs(Random.nextInt() % 50))
val bytes: Source[Char, NotUsed] =
Source.fromIterator(() => randomChars)
val sizes: Source[Int, NotUsed] =
Source.fromIterator(() => randomNumbers).filter(_ != 0)
然后使用描述自定义流处理的信息 (http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html),您可以构造 GraphStage
。
case class ZipFraming() extends GraphStage[FanInShape2[Int, Char, (Int, ByteString)]] {
override def initialAttributes = Attributes.name("ZipFraming")
override val shape: FanInShape2[Int, Char, (Int, ByteString)] =
new FanInShape2[Int, Char, (Int, ByteString)]("ZipFraming")
val inFrameSize: Inlet[Int] = shape.in0
val inElements: Inlet[Char] = shape.in1
def out: Outlet[(Int, ByteString)] = shape.out
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// we will buffer as much as 512 characters from the input
val MaxBufferSize = 512
// the buffer for the received chars
var buffer = Vector.empty[Char]
// the needed number of elements
var needed: Int = -1
// if the downstream is waiting
var isDemanding = false
override def preStart(): Unit = {
pull(inFrameSize)
pull(inElements)
}
setHandler(inElements, new InHandler {
override def onPush(): Unit = {
// we buffer elements as long as we can
if (buffer.size < MaxBufferSize) {
buffer = buffer :+ grab(inElements)
pull(inElements)
}
emit()
}
})
setHandler(inFrameSize, new InHandler {
override def onPush(): Unit = {
needed = grab(inFrameSize)
emit()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
isDemanding = true
emit()
}
})
def emit(): Unit = {
if (needed > 0 && buffer.length >= needed && isDemanding) {
val (emit, reminder) = buffer.splitAt(needed)
push(out, (needed, ByteString(emit.map(_.toByte).toArray)))
buffer = reminder
needed = -1
isDemanding = false
pull(inFrameSize)
if (!hasBeenPulled(inElements)) pull(inElements)
}
}
}
}
这就是您运行它的方式。
RunnableGraph.fromGraph(GraphDSL.create(bytes, sizes)(Keep.none) { implicit b =>
(bs, ss) =>
import GraphDSL.Implicits._
val zipFraming = b.add(ZipFraming())
ss ~> zipFraming.in0
bs ~> zipFraming.in1
zipFraming.out ~> Sink.foreach[(Int, ByteString)](e => println((e._1, e._2.utf8String)))
ClosedShape
}).run()
关于scala - 如何根据另一个 Akka 流的元素聚合一个 Akka 流的元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36109241/
我有一个 Cassandra 集群,里面有 4 个表和数据。 我想使用聚合函数(sum,max ...)发出请求,但我在这里读到这是不可能的: http://www.datastax.com/docu
我有以下两张表 Table: items ID | TITLE 249 | One 250 | Two 251 | Three 我投票给这些: Table: votes VID | IID | u
这个问题在这里已经有了答案: Update MongoDB field using value of another field (12 个答案) 关闭 3 年前。 我想根据另一个“源”集合的文档中
我的收藏包含以下文件。我想使用聚合来计算里面有多少客户,但我遇到了一些问题。我可以获得总行数,但不能获得总(唯一)客户。 [{ _id: "n001", channel: "Kalip
我有下表 Id Letter 1001 A 1001 H 1001 H 1001 H 1001 B 1001 H 1001 H 1001
得到一列的表 ABC。 “创建”的日期列。所以样本值就像; created 2009-06-18 13:56:00 2009-06-18 12:56:00 2009-06-17 14:02:0
我有一个带有数组字段的集合: {[ name:String buyPrice:Int sellPrice:Int ]} 我试图找到最低和最高买入/卖出价格。在某些条目中,买入或卖出价格为零
我有以下问题: 在我的 mongo db 中,我有以下结构: { "instanceId": "12", "eventId": "0-1b", "activityType":
下面给出的是我要在其上触发聚合查询的 Elasticsearch 文档。 { "id": 1, "attributes": [ { "fieldId": 1,
我正在使用 Django 的 aggregate query expression总计一些值。最终值是一个除法表达式,有时可能以零作为分母。如果是这种情况,我需要一种方法来逃避,以便它只返回 0。 我
我正在学习核心数据,特别是聚合。 当前我想要做的事情:计算表中在某些条件上具有逆关系的多对关系的记录数。 目前我正在这样做: NSExpression *ex = [NSExpression expr
我需要有关 Delphi 中的 ClientDatasets 的一些帮助。 我想要实现的是一个显示客户的网格,其中一列显示每个客户的订单数量。我将 ClientDataset 放在表单上并从 Delp
我的集合有 10M 个文档,并且有一个名为 movieId 的字段;该文档具有以下结构: { "_id" : ObjectId("589bed43e3d78e89bfd9b779"), "us
这个问题已经有答案了: What is the difference between association, aggregation and composition? (21 个回答) 已关闭 9
我在 elasticsearch 中有一些类似于这些示例的文档: { "id": ">", "list": [ "a", "b", "c" ] } { "id"
我正在做一些聚合。但是结果完全不是我所期望的,似乎它们没有聚合索引中与我的查询匹配的所有文档,在这种情况下 - 它有什么好处? 例如,首先我做这个查询: {"index":"datalayer","t
假设我在 ES 中有这些数据。 | KEY | value | |:-----------|------------:| | A |
可能在我的文档中,我有一个被分析的文本字段。我只是在ElasticSearch AggregationAPI中迷路了。我需要2种不同情况的支持: 情况A)结果是带有计数标记(条款)的篮子下降。 情况B
我正在为网上商店构建多面过滤功能,如下所示: Filter on Brand: [ ] LG (10) [ ] Apple (5) [ ] HTC (3) Filter on OS: [ ] Andr
我有一个父/子关系并且正在搜索 child 。 是否可以在父属性上创建聚合? 例如parent 是 POST,children 是 COMMENT。如果父项具有“类别”属性,是否可以搜索 COMMEN
我是一名优秀的程序员,十分优秀!