gpt4 book ai didi

scala - scalaz-stream 中的桶式水槽

转载 作者:行者123 更新时间:2023-12-04 16:09:58 25 4
gpt4 key购买 nike

我正在尝试制作一个将流写入分桶文件的接收器:当达到特定条件(时间、文件大小等)时,当前输出流将关闭,并打开一个新的流到新的桶文件。

我检查了如何在 io 中创建不同的接收器对象,但例子并不多。所以我试着遵循 resourcechunkW被写。我最终得到了以下代码,为了简单起见,桶仅由 Int 表示。目前,但最终将是某种类型的输出流。

  val buckets: Channel[Task, String, Int] = {

//recursion to step through the stream
def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = {

// Emit the value and repeat
def next(msg: String => Task[Int]) =
Process.emit(msg) ++
go(step)


Process.await[Task, String => Task[Int], String => Task[Int]](step)(
next
, Process.halt // TODO ???
, Process.halt) // TODO ???
}

//starting bucket
val acquire: Task[Int] = Task.delay {
val startBuck = nextBucket(0)
println(s"opening bucket $startBuck")
startBuck
}

//the write step
def step(os: Int): Task[String => Task[Int]] =
Task.now((msg: String) => Task.delay {
write(os, msg)
val newBuck = nextBucket(os)
if (newBuck != os) {
println(s"closing bucket $os")
println(s"opening bucket $newBuck")
}
newBuck
})

//start the Channel
Process.await(acquire)(
buck => go(step(buck))
, Process.halt, Process.halt)
}

def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") }
def nextBucket(b: Int) = b+1

这里面有很多问题:
  • step在开始时通过桶一次,并且在递归过程中永远不会改变。我不确定如何在递归 go新建step将使用上一个任务中的存储桶 (Int) 的任务,因为我必须提供一个字符串才能完成该任务。
  • fallbackcleanupawait调用没有收到rcv的结果(如果有的话)。在 io.resource功能,它工作正常,因为资源是固定的,但是,就我而言,资源可能会在任何步骤发生变化。我将如何将当前打开的存储桶的引用传递给这些回调?
  • 最佳答案

    选项之一(即时间)可能是使用简单的 go在水槽上。这个使用基于时间的,基本上每小时重新打开文件:

    val metronome =  Process.awakeEvery(1.hour).map(true)


    def writeFileSink(file:String):Sink[Task,ByteVector] = ???


    def timeBasedSink(prefix:String) = {
    def go(index:Int) : Sink[Task,ByteVector] = {
    metronome.wye(write(prefix + "_" + index))(wye.interrupt) ++ go(index + 1)
    }

    go(0)
    }

    对于其他选项(即写入的字节),您可以使用类似的技术,只需保留写入的字节信号并将其与 Sink 结合。

    关于scala - scalaz-stream 中的桶式水槽,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22773392/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com