gpt4 book ai didi

scala - 通过在 apache spark scala 中编写单元测试来测试实用程序函数

转载 作者:行者123 更新时间:2023-12-04 15:45:54 25 4
gpt4 key购买 nike

我有一个用 Scala 编写的实用程序函数,用于从 s3 存储桶读取 Parquet 文件。有人可以帮我为此编写单元测试用例吗

下面是需要测试的功能。

  def readParquetFile(spark: SparkSession,
locationPath: String): DataFrame = {
spark.read
.parquet(locationPath)
}

到目前为止,我已经创建了一个主节点是本地的 SparkSession
import org.apache.spark.sql.SparkSession


trait SparkSessionTestWrapper {

lazy val spark: SparkSession = {
SparkSession.builder().master("local").appName("Test App").getOrCreate()
}

}

我坚持测试该功能。这是我被卡住的代码。问题是我应该创建一个真正的 Parquet 文件并加载以查看数据框是否正在创建或者是否有模拟框架来测试它。
import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.scalatest.FunSpec

class ReadAndWriteSpec extends FunSpec with DataFrameComparer with SparkSessionTestWrapper {

import spark.implicits._

it("reads a parquet file and creates a dataframe") {

}

}

编辑:

根据评论中的输入,我想出了以下内容,但我仍然无法理解如何利用它。

我正在使用 https://github.com/findify/s3mock
class ReadAndWriteSpec extends FunSpec with DataFrameComparer with SparkSessionTestWrapper {

import spark.implicits._

it("reads a parquet file and creates a dataframe") {

val api = S3Mock(port = 8001, dir = "/tmp/s3")
api.start

val endpoint = new EndpointConfiguration("http://localhost:8001", "us-west-2")
val client = AmazonS3ClientBuilder
.standard
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(endpoint)
.withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
.build

/** Use it as usual. */
client.createBucket("foo")
client.putObject("foo", "bar", "baz")
val url = client.getUrl("foo","bar")

println(url.getFile())

val df = ReadAndWrite.readParquetFile(spark,url.getPath())
df.printSchema()

}

}

最佳答案

我想通了并保持简单。我可以完成一些基本的测试用例。

这是我的解决方案。我希望这会帮助某人。

import org.apache.spark.sql
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import loaders.ReadAndWrite

class ReadAndWriteTestSpec extends FunSuite with BeforeAndAfterEach{

private val master = "local"

private val appName = "ReadAndWrite-Test"

var spark : SparkSession = _

override def beforeEach(): Unit = {
spark = new sql.SparkSession.Builder().appName(appName).master(master).getOrCreate()
}

test("creating data frame from parquet file") {
val sparkSession = spark
import sparkSession.implicits._
val peopleDF = spark.read.json("src/test/resources/people.json")
peopleDF.write.mode(SaveMode.Overwrite).parquet("src/test/resources/people.parquet")

val df = ReadAndWrite.readParquetFile(sparkSession,"src/test/resources/people.parquet")
df.printSchema()

}


test("creating data frame from text file") {
val sparkSession = spark
import sparkSession.implicits._
val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
peopleDF.printSchema()
}

test("counts should match with number of records in a text file") {
val sparkSession = spark
import sparkSession.implicits._
val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
peopleDF.printSchema()

assert(peopleDF.count() == 3)
}

test("data should match with sample records in a text file") {
val sparkSession = spark
import sparkSession.implicits._
val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
peopleDF.printSchema()

assert(peopleDF.take(1)(0)(0).equals("Michael"))
}

test("Write a data frame as csv file") {
val sparkSession = spark
import sparkSession.implicits._
val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()

//header argument should be boolean to the user to avoid confusions
ReadAndWrite.writeDataframeAsCSV(peopleDF,"src/test/resources/out.csv",java.time.Instant.now().toString,",","true")
}

override def afterEach(): Unit = {
spark.stop()
}

}

case class Person(name: String, age: Int)

关于scala - 通过在 apache spark scala 中编写单元测试来测试实用程序函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55831503/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com