gpt4 book ai didi

spring-boot - 如何使用 TestContainers 创建用于集成测试的 apache spark 独立集群?

转载 作者:行者123 更新时间:2023-12-04 17:31:27 28 4
gpt4 key购买 nike

有谁知道如何使用 testContainers https://www.testcontainers.org/ 创建一个用于集成测试的 apache-spark 集群

请提供任何运行示例,我正在努力找到它。

最佳答案

我能够使用 GenericContainer 类和 bitnami/spark 图像创建这种集成测试。它是以下代码(我是为将数据帧写入 AWS SQS 的 library 编写的)。
这个想法是创建一个 Spark 容器(在这种情况下,它不是一个集群,而只是主节点),复制运行测试所需的所有文件(一些 Python 文件和所有依赖项),发出 spark-submit 命令和检查最终状态(另一个容器中 Localstack 的 SQS 服务中的消息)。

    @Testcontainers
public class SparkIntegrationTest {

private static Network network = Network.newNetwork();

@Container
public LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.13"))
.withNetwork(network)
.withNetworkAliases("localstack")
.withServices(SQS);

@Container
public GenericContainer spark = new GenericContainer(DockerImageName.parse("bitnami/spark:3.1.2"))
.withCopyFileToContainer(MountableFile.forHostPath("build/resources/test/.", 0744), "/home/")
.withCopyFileToContainer(MountableFile.forHostPath("build/libs/.", 0555), "/home/")
.withNetwork(network)
.withEnv("AWS_ACCESS_KEY_ID", "test")
.withEnv("AWS_SECRET_KEY", "test")
.withEnv("SPARK_MODE", "master");

@Test
public void shouldPutASQSMessageInLocalstackUsingSpark() throws IOException, InterruptedException {
String expectedBody = "my message body"; // the same value in resources/sample.txt

AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(localstack.getEndpointConfiguration(SQS))
.withCredentials(localstack.getDefaultCredentialsProvider())
.build();
sqs.createQueue("my-test");

org.testcontainers.containers.Container.ExecResult lsResult =
spark.execInContainer("spark-submit",
"--jars", "/home/spark-aws-messaging-0.3.1.jar,/home/deps/aws-java-sdk-core-1.12.12.jar,/home/deps/aws-java-sdk-sqs-1.12.12.jar",
"--master", "local",
"/home/sqs_write.py",
"/home/sample.txt",
"http://localstack:4566");

System.out.println(lsResult.getStdout());
System.out.println(lsResult.getStderr());

assertEquals(0, lsResult.getExitCode());

String queueUrl = sqs.getQueueUrl("my-test").getQueueUrl()
.replace("localstack", localstack.getContainerIpAddress());
List<Message> messages = sqs.receiveMessage(queueUrl)
.getMessages();
assertEquals(expectedBody, messages.get(0).getBody());
}
}

关于spring-boot - 如何使用 TestContainers 创建用于集成测试的 apache spark 独立集群?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59245356/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com