gpt4 book ai didi

scala - Spark Scala UDP 在监听端口上接收

转载 作者:行者123 更新时间:2023-12-04 17:55:47 28 4
gpt4 key购买 nike

中提到的例子 http://spark.apache.org/docs/latest/streaming-programming-guide.html让我在 TCP 流中接收数据包并在 9999 端口

上监听
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))


// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

我可以通过在我的 Linux 系统中使用创建数据服务器来通过 TCP 发送数据$ nc -lk 9999

问题
我需要使用 UDP 和 Scala/Spark
从安卓手机接收流 val lines = ssc.socketTextStream("localhost", 9999)
仅在 TCP 流中接收。

我如何使用 Scala+Spark 以类似的简单方式接收 UDP 流并创建 Spark DStream。

最佳答案

没有内置的东西,但你自己完成它并没有太多的工作。这是我基于自定义 UdpSocketInputDStream[T] 制作的简单解决方案:

import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

import scala.reflect.ClassTag
import scala.util.control.NonFatal

class UdpSocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {

def getReceiver(): Receiver[T] = {
new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
}
}

class UdpSocketReceiver[T: ClassTag](host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

var udpSocket: DatagramSocket = _

override def onStart(): Unit = {

try {
udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
} catch {
case e: ConnectException =>
restart(s"Error connecting to $port", e)
return
}

// Start the thread that receives data over a connection
new Thread("Udp Socket Receiver") {
setDaemon(true)

override def run() {
receive()
}
}.start()
}

/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val buffer = new Array[Byte](2048)

// Create a packet to receive data into the buffer
val packet = new DatagramPacket(buffer, buffer.length)

udpSocket.receive(packet)

val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
// Now loop forever, waiting to receive packets and printing them.
while (!isStopped() && iterator.hasNext) {
store(iterator.next())
}

if (!isStopped()) {
restart("Udp socket data stream had no more data")
}
} catch {
case NonFatal(e) =>
restart("Error receiving data", e)
} finally {
onStop()
}
}

override def onStop(): Unit = {
synchronized {
if (udpSocket != null) {
udpSocket.close()
udpSocket = null
}
}
}
}

为了让StreamingContext在自身上添加一个方法,我们用一个隐式类来丰富它:

object Implicits {
implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
def udpSocketStream[T: ClassTag](host: String,
port: Int,
converter: InputStream => Iterator[T],
storageLevel: StorageLevel): InputDStream[T] = {
new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
}
}
}

这就是你如何调用它:

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.ClassTag

object TestRunner {
import Implicits._

def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext("local[*]", "udpTest")
val ssc = new StreamingContext(sparkContext, Seconds(4))

val stream = ssc.udpSocketStream("localhost",
3003,
bytesToLines,
StorageLevel.MEMORY_AND_DISK_SER_2)
stream.print()

ssc.start()
ssc.awaitTermination()
}

def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
new NextIterator[String] {
protected override def getNext(): String = {
val nextValue = dataInputStream.readLine()
if (nextValue == null) {
finished = true
}
nextValue
}

protected override def close() {
dataInputStream.close()
}
}
}

abstract class NextIterator[U] extends Iterator[U] {
protected var finished = false
private var gotNext = false
private var nextValue: U = _
private var closed = false

override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}

override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}

def closeIfNeeded() {
if (!closed) {
closed = true
close()
}
}

protected def getNext(): U
protected def close()
}
}

这段代码大部分取自Spark提供的SocketInputDStream[T],我简单的复用了一下。我还获取了 bytesToLines 使用的 NextIterator 的代码,它所做的只是消耗数据包中的行并将其转换为 String。如果您有更复杂的逻辑,您可以通过传递 converter: InputStream => Iterator[T] 您自己的实现来提供它。

使用简单的 UDP 数据包对其进行测试:

echo -n "hello hello hello!" >/dev/udp/localhost/3003

产量:

-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!

当然,这个还有待进一步测试。我还有一个隐藏的假设,即从 DatagramPacket 创建的每个 buffer 都是 2048 字节,这可能是您想要更改的内容。

关于scala - Spark Scala UDP 在监听端口上接收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41320483/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com