gpt4 book ai didi

scala - 使用 testcontainers 测试 kafka 和 spark

转载 作者:行者123 更新时间:2023-12-04 13:51:37 30 4
gpt4 key购买 nike

我正在尝试使用 testcontainers 检查流管道作为集成测试,但我不知道如何获取 bootstrapServers,至少在最后一个 testcontainers 版本中并在那里创建特定主题。如何使用“containerDef”来提取引导服务器并添加主题?

import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession

class Mykafkatest extends FunSuite with TestContainerForAll {
//val kafkaContainer: KafkaContainer = KafkaContainer("confluentinc/cp-kafka:5.4.3")
override val containerDef: ContainerDef = KafkaContainer.Def()

test("do something")(withContainers { container =>
val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Unit testing")
.getOrCreate()

// How add a topic in that container?

// This is not posible:
val servers=container.bootstrapServers

val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "topic1")
.load()


df.show(false)

})

}
我的 sbt 配置:
lazy val root = project
.in(file("./pipeline"))
.settings(
organization := "org.example",
name := "spark-stream",
version := "0.1",
scalaVersion := "2.12.10",
libraryDependencies := Seq(
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.3" % Compile,
"org.apache.spark" %% "spark-sql" % "3.0.3" % Compile,
"com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5" % Test,
"org.dimafeng" %% "testcontainers-scala-kafka" % "0.39.5" % Test,
"org.scalameta" %% "munit" % "0.7.28" % Test
),
testFrameworks += new TestFramework("munit.Framework"),
Test / fork := true
)
文档未显示完整示例: https://www.testcontainers.org/modules/kafka/

最佳答案

这里唯一的问题是您明确地转换了 KafkaContainer.DefContainerDef .withContianers提供的容器类型, Containterpath dependent type决定在提供 ContainerDef ,

trait TestContainerForAll extends TestContainersForAll { self: Suite =>

val containerDef: ContainerDef

final override type Containers = containerDef.Container

override def startContainers(): containerDef.Container = {
containerDef.start()
}

// inherited from TestContainersSuite
def withContainers[A](runTest: Containers => A): A = {
val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
runTest(c)
}

}
trait ContainerDef {

type Container <: Startable with Stoppable

protected def createContainer(): Container

def start(): Container = {
val container = createContainer()
container.start()
container
}
}
您明确指定类型的那一刻 ContainerDefoverride val containerDef: ContainerDef = KafkaContainer.Def() ,这打破了整个“类型欺骗”,因此 Scala 编译器留下了 type Container <: Startable with Stoppable而不是 KafkaContainer .
所以,只需删除显式类型转换为 ContainerDef ,以及 val servers = container.bootstrapServers将按预期工作。
import com.dimafeng.testcontainers.KafkaContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import munit.FunSuite

class Mykafkatest extends FunSuite with TestContainerForAll {
override val containerDef = KafkaContainer.Def()

test("do something")(withContainers { container =>
//...

val servers = container.bootstrapServers

println(servers)

//...
})
}

关于scala - 使用 testcontainers 测试 kafka 和 spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68914485/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com