gpt4 book ai didi

apache-spark - 如何使用 Spark-Xml 生成复杂的 XML

转载 作者:行者123 更新时间:2023-12-04 20:29:38 26 4
gpt4 key购买 nike

我正在尝试从我的 JavaRDd 和 JavaRdd 生成一个复杂的 xml,我如何将这两个用于生成下面的 xml?

<xml>
<library>
<books>
<book>
<author>test</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
</reviews>
</library>

如您所见,有一个父根库,其中包含子 Books 和 Reviews。

下面是我如何生成书籍和评论数据框
DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);

我知道要生成 xml,我的疑问特别是对于将 Library rootTag 和 Books 和 Reviews 作为其子级。

我正在使用 Java。但如果你能指出我的正确,你可以编写 Scala 或 Python 示例。

最佳答案

这可能不是使用 Spark 执行此操作的最有效方法,但下面的代码可以按您的需要工作。 (虽然是 Scala,因为我的 Java 有点生疏)

import java.io.{File, PrintWriter}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.io.Source

val spark = SparkSession.builder()
.master("local[3]")
.appName("test")
.config("spark.driver.allowMultipleContexts", "true")
.getOrCreate()

import spark.implicits._

/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)


val bookFrame = List(
Book("book1"),
Book("book2"),
Book("book3"),
Book("book4"),
Book("book5")
).toDS()


val reviewFrame = List(
Review(1),
Review(2),
Review(3),
Review(4)
).toDS()

/* End test code **/

// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
.repartition(1)
.write
.format("com.databricks.spark.xml")
.option("rootTag", "books")
.option("rowTag", "book")
.mode(SaveMode.Overwrite)
.save("/tmp/books/") // store to temp location

// Same for reviews
reviewFrame
.repartition(1)
.write
.format("com.databricks.spark.xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.mode(SaveMode.Overwrite)
.save("/tmp/review") // store to temp location


def concatFiles(path:String):List[String] = {
new File(path)
.listFiles
.filter(
_.getName.startsWith("part") // get all part-xxx files only (should be only 1)
)
.flatMap(file => Source.fromFile(file.getAbsolutePath).getLines())
.map(" " + _) // prefix with spaces to allow for new root level xml
.toList
}

val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml"){
write(lines.mkString("\n"))
close
}

结果:

<xml>
<library>
<books>
<book>
<author>book1</author>
</book>
<book>
<author>book2</author>
</book>
<book>
<author>book3</author>
</book>
<book>
<author>book4</author>
</book>
<book>
<author>book5</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
<review>
<id>2</id>
</review>
<review>
<id>3</id>
</review>
<review>
<id>4</id>
</review>
</reviews>
</library>

另一种方法可能是(仅使用 spark)来创建一个新对象 case class BookReview(books: List[Book], reviews: List[Review])并在 .collect() 之后将其存储到 xml所有书籍和评论都放在一个列表中。

虽然那时我不会使用 spark 来处理单个记录(BookReview),而是使用普通的 xml 库(如 xstream 左右)来存储这个对象。

更新
List concat 方法不是内存友好的,所以使用流和缓冲区这可能是一个解决方案,而不是 concatFiles方法。

def outputConcatFiles(path: String, outputFile: File): Unit = {
new File(path)
.listFiles
.filter(
_.getName.startsWith("part") // get all part-xxx files only (should be only 1)
)
.foreach(file => {
val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))

try {
Stream.continually(reader.readLine())
.takeWhile(_ != null)
.foreach(line =>
writer.write(s" $line\n".getBytes)
)
} catch {
case e: Exception => println(e.getMessage)
} finally {
writer.close()
reader.close()
}
})
}

val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile) { write("<xml>\n<library>\n"); close}
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true)) { append("</library>"); close}

关于apache-spark - 如何使用 Spark-Xml 生成复杂的 XML,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49729896/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com