gpt4 book ai didi

scala - Spark Streaming DStream RDD以获取文件名

转载 作者:行者123 更新时间:2023-12-04 04:16:59 25 4
gpt4 key购买 nike

Spark流textFileStreamfileStream可以监视目录并处理Dstream RDD中的新文件。

如何获取特定时间间隔内DStream RDD正在处理的文件名?

最佳答案

fileStream产生UnionRDDNewHadoopRDD。关于由NewHadoopRDD创建的sc.newAPIHadoopFile的优点在于,将其name设置为其路径。

这是您可以利用这些知识执行的操作的示例:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
.transform( rdd =>
new UnionRDD(rdd.context,
rdd.dependencies.map( dep =>
dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
)
)
)

def transformByFile[U: ClassTag](unionrdd: RDD[String],
transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
new UnionRDD(unionrdd.context,
unionrdd.dependencies.map{ dep =>
if (dep.rdd.isEmpty) None
else {
val filename = dep.rdd.name
Some(
transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
.setName(filename)
)
}
}.flatten
)
}

def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Process by file")
.setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(30))

val dstream = namesTextFileStream(ssc, "/some/directory")

def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
rdd.map(line => (filename, line))

val transformed = dstream.
transform(rdd => transformByFile(rdd, byFileTransformer))

// Do some stuff with transformed

ssc.start()
ssc.awaitTermination()
}

关于scala - Spark Streaming DStream RDD以获取文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29031276/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com