gpt4 book ai didi

scala - 对于 DStream 中的每个 RDD,如何将其转换为数组或其他一些典型的 Java 数据类型?

转载 作者:行者123 更新时间:2023-12-04 13:27:06 25 4
gpt4 key购买 nike

我想将 DStream 转换为数组、列表等,然后我可以将其转换为 json 并在端点上提供它。我正在使用 apache spark,注入(inject) twitter 数据。如何在 Dstream statuses 上执行此操作?除了print(),我似乎什么也做不了.

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {

// Location of the Spark directory
val sparkHome = "/opt/spark"

// URL of the Spark cluster
val sparkUrl = "local[8]"

// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"

// HDFS directory for checkpointing
val checkpointDir = "/tmp"

// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()

val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))

val filters = Array("#americasgottalent", "iamawesome")
val tweets = TwitterUtils.createStream(ssc, None, filters)

val statuses = tweets.map(status => status.getText())

val arry = Array("firstval")
statuses.foreachRDD {
arr :+ _.collect()
}

ssc.checkpoint(checkpointDir)

ssc.start()
ssc.awaitTermination()
}
}

最佳答案

转身我们的你很接近,但我最终寻找的是。

statuses.foreachRDD( rdd => {
for(item <- rdd.collect().toArray) {
println(item);
}
})

关于scala - 对于 DStream 中的每个 RDD,如何将其转换为数组或其他一些典型的 Java 数据类型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24772799/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com