gpt4 book ai didi

java - 在同一 JVM 中检测到多个正在运行的 SparkContext - Java Spark

转载 作者:行者123 更新时间:2023-12-01 09:14:00 25 4
gpt4 key购买 nike

我正在尝试使用 JavaSparkContextMongoDB 集合中读取数据。所以我有以下实用程序:

public class SparkUtil {

private String host;

private Integer port;

private String database;

public final static Logger log = Logger.getLogger( SparkUtil.class );

private static final String SPARK_MONGO_INPUT_URI = "spark.mongodb.input.uri";
private static final String SPARK_MONGO_OUTPUT_URI = "spark.mongodb.output.uri";

private static SparkConf conf;
private static JavaSparkContext jsc;


/**
*
* @param master
* @param appname
* @param inputCollection
* @param outputCollection
*/
public SparkUtil(final String host, final Integer port, final String database,
final String master, final String appname, final String inputCollection,
final String outputCollection) {
try {
this.host = host;
this.port = port;
this.database = database;
String inputURI = this.formatMongoURI(inputCollection);
String outputURI = this.formatMongoURI(outputCollection);
log.info("----------------------------------------------------");
log.info("Mongo Input URI: " + inputURI);
log.info("Mongo Output URI: " + outputURI);
log.info("----------------------------------------------------");
conf = new SparkConf()
.setMaster(master)
.setAppName(appname)
.set(SPARK_MONGO_INPUT_URI, inputURI)
.set(SPARK_MONGO_OUTPUT_URI, outputURI)
.set("spark.driver.allowMultipleContexts", "true");
SparkContext sc = new SparkContext(conf);
jsc = JavaSparkContext.fromSparkContext(sc);
} catch (Exception ex) {
log.error(ex.getMessage());
}
}
}

但是,我收到以下错误:

16:19:58.929 [main] DEBUG org.spark_project.jetty.util.component.AbstractLifeCycle - STARTED @36813ms o.s.j.s.ServletContextHandler@de81be1{/metrics/json,null,AVAILABLE}
16:19:58.931 [main] WARN org.apache.spark.SparkContext - Multiple running SparkContexts detected in the same JVM!
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:77)
cloudos.utils.SparkUtil.<init>(SparkUtil.java:65)
utils.SparkUtilTest.setUp(SparkUtilTest.java:47)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2223)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2219)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2219)
at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:2305)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:2175)
at cloudos.utils.SparkUtil.<init>(SparkUtil.java:65)
at utils.SparkUtilTest.setUp(SparkUtilTest.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at org.testng.junit.JUnit4TestRunner.start(JUnit4TestRunner.java:81)
at org.testng.junit.JUnit4TestRunner.run(JUnit4TestRunner.java:69)
at org.testng.TestRunner$1.run(TestRunner.java:689)
at org.testng.TestRunner.runWorkers(TestRunner.java:1014)
at org.testng.TestRunner.privateRunJUnit(TestRunner.java:720)
at org.testng.TestRunner.run(TestRunner.java:621)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:359)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:354)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:312)
at org.testng.SuiteRunner.run(SuiteRunner.java:261)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1191)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
at org.testng.TestNG.run(TestNG.java:1024)
at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:115)
at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:129)
at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:113)
at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:111)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
16:19:58.932 [main] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext from cache with key [[WebMergedContextConfiguration@1fa121e2 testClass = SparkUtilTest, locations = '{}', classes = '{class utils.SparkUtilTest$ContextConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{classpath:test.properties}', propertySourceProperties = '{org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true}', contextCustomizers = set[org.springframework.boot.test.context.SpringBootTestContextCustomizer@6aba2b86, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@47af7f3d, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0], resourceBasePath = 'src/main/webapp', contextLoader = 'org.springframework.test.context.support.AnnotationConfigContextLoader', parent = [null]]]
16:19:58.932 [main] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@4b7dc788 size = 1, maxSize = 32, parentContextCount = 0, hitCount = 12, missCount = 1]

我正在从以下单元测试运行它:

@RunWith(SpringRunner.class)
@SpringBootTest
@TestPropertySource(value="classpath:test.properties")
@ContextConfiguration(loader = AnnotationConfigContextLoader.class)
public class SparkUtilTest {

public final static Logger log = Logger.getLogger( SparkUtilTest.class );

private SparkUtil sparkUtil;

@Value("${spring.data.mongodb.host}")
private String host;

@Value("${spring.data.mongodb.port}")
private Integer port;

@Value("${spring.data.mongodb.database}")
private String database;

@Configuration
@TestPropertySource(value="classpath:test.properties")
static class ContextConfiguration {
}

@Before
public void setUp() throws Exception {
this.sparkUtil = new SparkUtil(this.host, this.port, this.database, "local",
"AmazonML", "aws_instances", "aws_instances");
}

@Test
public void testGetMethods() {
assertNotNull(this.sparkUtil.getJavaSparkContext());
assertNotNull(this.sparkUtil.getSparkConfig());
}

@Test
public void testRead() {
JavaRDD<Document> rdd = this.sparkUtil.read();
assertNotNull(rdd);
assertNotEquals(rdd.count(), 0);
log.info("-------------------------------------");
log.info("Count: " + rdd.count());
log.info("Object: " + rdd.first().toJson());
log.info("-------------------------------------");
}

}

我已经使用了 set("spark.driver.allowMultipleContexts", "true") 但它没有任何区别。我使用的是 scala 2.12mongo-spark-connector_2.11spark-sql_2.11Spark 1.63 >。我该如何解决我的问题?

最佳答案

我认为这是由您的 setUp() 方法引起的,该方法用 @Before 注释,因此在每个 @Test 之前调用它>。在本例中,它被调用两次,因为您有两个测试,这就是创建两个 SparkContext 的原因。

最简单的解决方案似乎是用 @BeforeClass 替换 @Before,但这只能暂时解决问题(直到您通过测试添加更多类)。不过,您现在可以尝试一下,看看是否有帮助。

您还可以在每次测试后尝试stopping SparkContext(使用@After@AfterClass,如果您将 @Before 更改为 @BeforeClass)。我认为当不再需要它时你应该停止它,这是一个很好的做法。

另一个解决方案是确保每个 JVM 仅创建 SparkUtil 一次。您可以简单地使用 singleton pattern ,但是您将没有简单的方法来停止 SparkContext。也许 Spring Boot 提供了更好的初始化和清理机制?

关于java - 在同一 JVM 中检测到多个正在运行的 SparkContext - Java Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40711076/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com