gpt4 book ai didi

hadoop - 如何在我的Spark Streaming应用程序中移动文件

转载 作者:行者123 更新时间:2023-12-02 21:10:11 25 4
gpt4 key购买 nike

在这里,我从流目录中流传输数据并将其写入输出位置。我也在尝试实现将hdfs文件从输入文件夹移动到流目录的过程。此操作在流上下文开始之前发生一次。但是我希望此 Action 在每批Dstream中每次都执行。那有可能吗?

    val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat](streaming_directory, (t:Path)=> true , true).map { case (x, y) => (y.toString) }
streamed_rdd.foreachRDD( rdd => {
rdd.map(x =>x.split("\t")).map(x => x(3)).foreachPartition { partitionOfRecords =>
val connection: Connection = connectionFactory.createConnection()
connection.setClientID("Email_send_module_client_id")
println("connection started with active mq")
val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
println("created session")
val dest = session.createQueue("dwEmailsQueue2")
println("destination queue name = dwEmailsQueue2")
val prod_queue = session.createProducer(dest)
connection.start()
partitionOfRecords.foreach { record =>
val rec_to_send: TextMessage = session.createTextMessage(record)
println("started creating a text message")
prod_queue.send(rec_to_send)
println("sent the record")
}
connection.close()
}
}
)
**val LIST = scala.collection.mutable.MutableList[String]()
val files_to_move = scala.collection.mutable.MutableList[String]()
val cmd = "hdfs dfs -ls -d "+load_directory+"/*"
println(cmd)
val system_time = System.currentTimeMillis
println(system_time)
val output = cmd.!!
output.split("\n").foreach(x => x.split(" ").foreach(x => if (x.startsWith("/user/hdpprod/")) LIST += x))
LIST.foreach(x => if (x.toString.split("/").last.split("_").last.toLong < system_time) files_to_move += x)
println("files to move" +files_to_move)
var mv_cmd :String = "hdfs dfs -mv "
for (file <- files_to_move){
mv_cmd += file+" "
}
mv_cmd += streaming_directory
println(mv_cmd)
val mv_output = mv_cmd.!!
println("moved the data to the folder")**
if (streamed_rdd.count().toString == "0") {
println("no data in the streamed list")
} else {
println("saving the Dstream at "+System.currentTimeMillis())
streamed_rdd.transform(rdd => {rdd.map(x => (check_time_to_send+"\t"+check_time_to_send_utc+"\t"+x))}).saveAsTextFiles("/user/hdpprod/temp/spark_streaming_output_sent/sent")
}
ssc.start()
ssc.awaitTermination()
}
}

最佳答案

我尝试在Java实现中做如下相同的事情。您可以从rdd上的foreachPartion调用此方法

 public static void moveFiles(final String moveFilePath,
final JavaRDD rdd) {
for (final Partition partition : rdd.partitions()) {
final UnionPartition unionPartition = (UnionPartition) partition;
final NewHadoopPartition newHadoopPartition = (NewHadoopPartition)
unionPartition.parentPartition();
final String fPath = newHadoopPartition.serializableHadoopSplit()
.value().toString();
final String[] filespaths = fPath.split(":");

if ((filespaths != null) && (filespaths.length > 0)) {
for (final String filepath : filespaths) {
if ((filepath != null) && filepath.contains("/")) {
final File file = new File(filepath);

if (file.exists() && file.isFile()) {
try {
File destFile = new File(moveFilePath + "/" +
file.getName());

if (destFile.exists()) {
destFile = new File(moveFilePath + "/" +
file.getName() + "_");
}

java.nio.file.Files.move((file
.toPath()), destFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);


} catch (Exception e) {
logger.error(
"Exception while moving file",
e);
}
}
}
}
}
}

}

关于hadoop - 如何在我的Spark Streaming应用程序中移动文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40517884/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com