gpt4 book ai didi

scala - 如何使用无限 Scala 流作为 Spark Streaming 中的源?

转载 作者:行者123 更新时间:2023-12-02 00:35:34 25 4
gpt4 key购买 nike

假设我本质上希望将 Stream.from(0) 作为 InputDStream。我该怎么办呢?我能看到的唯一方法是使用 StreamingContext#queueStream ,但我必须将另一个线程或子类 Queue 中的元素排入队列以创建一个行为类似于无限流,这两者都感觉像是黑客。

正确的做法是什么?

最佳答案

我认为默认情况下它在 Spark 中不可用,但使用 ReceiverInputDStream 很容易实现它。

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class InfiniteStreamInputDStream[T](
@transient ssc_ : StreamingContext,
stream: Stream[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {

override def getReceiver(): Receiver[T] = {
new InfiniteStreamReceiver(stream, storageLevel)
}
}

class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

// Stateful iterator
private val streamIterator = stream.iterator

private class ReadAndStore extends Runnable {
def run(): Unit = {
while (streamIterator.hasNext) {
val next = streamIterator.next()
store(next)
}
}
}

override def onStart(): Unit = {
new Thread(new ReadAndStore).run()
}

override def onStop(): Unit = { }
}

关于scala - 如何使用无限 Scala 流作为 Spark Streaming 中的源?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27742044/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com