- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
中提到的例子 http://spark.apache.org/docs/latest/streaming-programming-guide.html让我在 TCP 流中接收数据包并在 9999 端口
上监听import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
我可以通过在我的 Linux 系统中使用创建数据服务器来通过 TCP 发送数据$ nc -lk 9999
问题
我需要使用 UDP 和 Scala/Spark
从安卓手机接收流 val lines = ssc.socketTextStream("localhost", 9999)
仅在 TCP 流中接收。
我如何使用 Scala+Spark 以类似的简单方式接收 UDP 流并创建 Spark DStream。
最佳答案
没有内置的东西,但你自己完成它并没有太多的工作。这是我基于自定义 UdpSocketInputDStream[T]
制作的简单解决方案:
import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import scala.reflect.ClassTag
import scala.util.control.NonFatal
class UdpSocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
class UdpSocketReceiver[T: ClassTag](host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel) extends Receiver[T](storageLevel) {
var udpSocket: DatagramSocket = _
override def onStart(): Unit = {
try {
udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
} catch {
case e: ConnectException =>
restart(s"Error connecting to $port", e)
return
}
// Start the thread that receives data over a connection
new Thread("Udp Socket Receiver") {
setDaemon(true)
override def run() {
receive()
}
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val buffer = new Array[Byte](2048)
// Create a packet to receive data into the buffer
val packet = new DatagramPacket(buffer, buffer.length)
udpSocket.receive(packet)
val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
// Now loop forever, waiting to receive packets and printing them.
while (!isStopped() && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Udp socket data stream had no more data")
}
} catch {
case NonFatal(e) =>
restart("Error receiving data", e)
} finally {
onStop()
}
}
override def onStop(): Unit = {
synchronized {
if (udpSocket != null) {
udpSocket.close()
udpSocket = null
}
}
}
}
为了让StreamingContext
在自身上添加一个方法,我们用一个隐式类来丰富它:
object Implicits {
implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
def udpSocketStream[T: ClassTag](host: String,
port: Int,
converter: InputStream => Iterator[T],
storageLevel: StorageLevel): InputDStream[T] = {
new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
}
}
}
这就是你如何调用它:
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.reflect.ClassTag
object TestRunner {
import Implicits._
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext("local[*]", "udpTest")
val ssc = new StreamingContext(sparkContext, Seconds(4))
val stream = ssc.udpSocketStream("localhost",
3003,
bytesToLines,
StorageLevel.MEMORY_AND_DISK_SER_2)
stream.print()
ssc.start()
ssc.awaitTermination()
}
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
new NextIterator[String] {
protected override def getNext(): String = {
val nextValue = dataInputStream.readLine()
if (nextValue == null) {
finished = true
}
nextValue
}
protected override def close() {
dataInputStream.close()
}
}
}
abstract class NextIterator[U] extends Iterator[U] {
protected var finished = false
private var gotNext = false
private var nextValue: U = _
private var closed = false
override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}
def closeIfNeeded() {
if (!closed) {
closed = true
close()
}
}
protected def getNext(): U
protected def close()
}
}
这段代码大部分取自Spark提供的SocketInputDStream[T]
,我简单的复用了一下。我还获取了 bytesToLines
使用的 NextIterator
的代码,它所做的只是消耗数据包中的行并将其转换为 String
。如果您有更复杂的逻辑,您可以通过传递 converter: InputStream => Iterator[T]
您自己的实现来提供它。
使用简单的 UDP 数据包对其进行测试:
echo -n "hello hello hello!" >/dev/udp/localhost/3003
产量:
-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!
当然,这个还有待进一步测试。我还有一个隐藏的假设,即从 DatagramPacket
创建的每个 buffer
都是 2048 字节,这可能是您想要更改的内容。
关于scala - Spark Scala UDP 在监听端口上接收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41320483/
我遇到了一个奇怪的问题,我以前从未真正体验过这个,这是服务器的代码(在这种情况下客户端是 firefox),我创建它的方式: _Socket = new Socket( AddressFamily.I
我正在使用 C 编程语言在 Ubuntu 中开发原始套接字程序。由于我使用原始套接字,因此需要使用 SOCK_RAW 类型而不是 SOCK_STREAM。使用 SOCK_RAW 反过来会通过抛出 来禁
我实际上在客户端-客户端应用程序方面遇到了麻烦。这个问题中的一切都与Unix网络编程环境有关。 这是我的情况: 我有一个客户端(从现在起称为 C1),它调用 listen()在套接字上。 C1 输入
是否可以监听 QTcpSocket?我在 Tcp 上有一个简单的 p2p 连接。但是我找不到在随机自由端口上启动 QTcpSocket 的方法。我应该为此使用 QTcpServer,还是仅对 1 个连
我正在尝试根据 this documentation 监听码头更改事件和 ACTION_DOCK_EVENT . 我的接收器从未被击中。 我的代码看起来很简单,所以我想知道监听停靠事件是否需要权限?我
在 Linux 3.9 内核和更高版本中运行,我有一个应用程序 X,它在特定套接字上监听连接。我想写一个不相关的应用程序 Y,它跟踪尝试连接到此套接字的次数、源 IP 等。 是否可以在 c++ 中(最
是否可以监听日志条目?即是否存在用于附加日志条目的广播 Intent ? 最佳答案 没有 Intent 。 使用下面的代码: try { Process mLogcatProc
我已经实现了 installTap 方法,它为我提供了音频缓冲区浮点示例。我已经通过我的 C++ DSP 库过滤了它们。我想将此缓冲区“发送”到耳机/扬声器。我从示例中再次执行了 AVAudioPCM
我正在尝试启动一个在后台运行的服务,该服务正在监听 ACTION_SCREEN_OFF,当它找到 ACTION_SCREEN_OFF 时,启动我的 Activity 。 我在某处读到您需要创建一个Br
我正在开发一个 HOC,以帮助处理我的 React 应用程序中的表单(用于练习)。 // components/Wrapper.js import React from 'react'; export
我有“服务器端”mqtt 客户端,用于监视和管理远程 mqtt 客户端。我想扩展此服务器模块以密切关注远程客户端的连接。 在正常操作期间,远程客户端定期 PING 代理,根据代理日志: 1532924
我正在编写一个检查电池容量的守护进程。这是用于运行 Linux 的太阳能嵌入式设备。我读过使用 sleep() 是个坏主意在守护进程中,因此我正在尝试使用事件。所以我写了一些 PoC,但我没有收到任何
是否有一个库或技术来监听 Swing ui 对象上的所有可变事件?具体数据。 例如,我有一个带有 JTextArea、JCheckBox、JComboBox 等的 JPanel。有没有一种通用的方法可
使用 KineticJS,是否可以仅绑定(bind)函数一次?就像 jQuery 的等价物一样......例如。在 jQuery 中 // bad $('.wrap a').on('click', m
我正在使用 jquery 制作一个非常简单的富文本编辑器...我不想使用第三方的。 我需要监听 iframe(同一域等)内的事件,从输入开始。显然我需要经常使用bind()。 这就是我目前所拥有的,它
我有一个函数可以获取 XML 文档并根据 XSL 文档对其进行转换。然后,它将结果放入 id 为 laneconfigdisplay 的 div 中。我想要做的是,与转换逻辑分开,为该 div 设置一
如何检测 NSTextView 中的一行(即“\n”字符)被删除?我可以使用 textView:doCommandBySelector: 轻松监听新行并监听 "insertNewLine:"。 (参见
我有以下代码订阅 VisiblePosition 的属性更改事件Column 的属性(property)类(class): DependencyPropertyDescriptor dpd = Dep
我正在尝试在本地主机上收听浏览器的 DNS 请求。 我写了这段代码: WSADATA wsaData; unsigned char hostname[100]; int sockfd; struct
如何以 JSON 对象的形式接收对我网站上特定页面的回调?这些回调是从第三方 API 发出的,用于报告我与该 API 的通信状态。 我想过使用node.js的http.server.listen,但我
我是一名优秀的程序员,十分优秀!