gpt4 book ai didi

scala - 无法在已停止的 SparkContext 上调用方法

转载 作者:行者123 更新时间:2023-12-02 08:21:15 27 4
gpt4 key购买 nike

当我运行以下测试时,它会抛出“无法在已停止的 SparkContext 上调用方法”。可能的问题是我使用 TestSuiteBase和流式 Spark 上下文。在线val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)我需要使用 SparkContext我通过 ssc.sparkContext 访问这就是我遇到问题的地方(请参阅下面的警告和错误消息)

class StreamingTest extends TestSuiteBase with BeforeAndAfter {

test("Test 1") {
//...
val gridEvals = for (initialWeights <- gridParams("initialWeights");
stepSize <- gridParams("stepSize");
numIterations <- gridParams("numIterations")) yield {
val lr = new StreamingLinearRegressionWithSGD()
.setInitialWeights(initialWeights.asInstanceOf[Vector])
.setStepSize(stepSize.asInstanceOf[Double])
.setNumIterations(numIterations.asInstanceOf[Int])

ssc = setupStreams(inputData, (inputDStream: DStream[LabeledPoint]) => {
lr.trainOn(inputDStream)
lr.predictOnValues(inputDStream.map(x => (x.label, x.features)))
})

val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
val cvRMSE = calculateRMSE(output, nPoints)
println(s"RMSE = $cvRMSE")
(initialWeights, stepSize, numIterations, cvRMSE)

}

val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)

}

}

16/04/27 10:40:17 WARN StreamingContext: StreamingContext has already been stopped 16/04/27 10:40:17 INFO SparkContext: SparkContext already stopped.

Cannot call methods on a stopped SparkContext



更新:

这是基类 TestSuiteBase :
trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {

// Name of the framework for Spark context
def framework: String = this.getClass.getSimpleName

// Master for Spark context
def master: String = "local[2]"

// Batch duration
def batchDuration: Duration = Seconds(1)

// Directory where the checkpoint data will be saved
lazy val checkpointDir: String = {
val dir = Utils.createTempDir()
logDebug(s"checkpointDir: $dir")
dir.toString
}

// Number of partitions of the input parallel collections created for testing
def numInputPartitions: Int = 2

// Maximum time to wait before the test times out
def maxWaitTimeMillis: Int = 10000

// Whether to use manual clock or not
def useManualClock: Boolean = true

// Whether to actually wait in real time before changing manual clock
def actuallyWait: Boolean = false

// A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)

// Timeout for use in ScalaTest `eventually` blocks
val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))

// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
if (useManualClock) {
logInfo("Using manual clock")
conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
} else {
logInfo("Using real clock")
conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock")
}
}

// Default after function for any streaming test suite. Override this
// if you want to add your stuff to "after" (i.e., don't call after { } )
def afterFunction() {
System.clearProperty("spark.streaming.clock")
}

before(beforeFunction)
after(afterFunction)

/**
* Run a block of code with the given StreamingContext and automatically
* stop the context when the block completes or when an exception is thrown.
*/
def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): R = {
try {
block(ssc)
} finally {
try {
ssc.stop(stopSparkContext = true)
} catch {
case e: Exception =>
logError("Error stopping StreamingContext", e)
}
}
}

/**
* Run a block of code with the given TestServer and automatically
* stop the server when the block completes or when an exception is thrown.
*/
def withTestServer[R](testServer: TestServer)(block: TestServer => R): R = {
try {
block(testServer)
} finally {
try {
testServer.stop()
} catch {
case e: Exception =>
logError("Error stopping TestServer", e)
}
}
}

/**
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
def setupStreams[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
): StreamingContext = {
// Create StreamingContext
val ssc = new StreamingContext(conf, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}

// Setup the stream computation
val inputStream = new TestInputStream(ssc, input, numPartitions)
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
outputStream.register()
ssc
}

/**
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
): StreamingContext = {
// Create StreamingContext
val ssc = new StreamingContext(conf, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}

// Setup the stream computation
val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
val operatedStream = operation(inputStream1, inputStream2)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
outputStream.register()
ssc
}

/**
* Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*
* Returns a sequence of items for each RDD.
*/
def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
// Flatten each RDD into a single Seq
runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
}

/**
* Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition.
*/
def runStreamsWithPartitions[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[Seq[V]]] = {
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)

// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.
filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]).
head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val output = outputStream.output

try {
// Start computation
ssc.start()

// Advance manual clock
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
if (actuallyWait) {
for (i <- 1 to numBatches) {
logInfo("Actually waiting for " + batchDuration)
clock.advance(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
} else {
clock.advance(numBatches * batchDuration.milliseconds)
}
logInfo("Manual clock after advancing = " + clock.getTimeMillis())

// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput &&
System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
ssc.awaitTerminationOrTimeout(50)
}
val timeTaken = System.currentTimeMillis() - startTime
logInfo("Output generated in " + timeTaken + " milliseconds")
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")

Thread.sleep(100) // Give some time for the forgetting old RDDs to complete
} finally {
ssc.stop(stopSparkContext = true)
}
output
}

/**
* Verify whether the output values after running a DStream operation
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
def verifyOutput[V: ClassTag](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
useSet: Boolean
) {
logInfo("--------------------------------")
logInfo("output.size = " + output.size)
logInfo("output")
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")

// Match the output with the expected output
for (i <- 0 until output.size) {
if (useSet) {
assert(
output(i).toSet === expectedOutput(i).toSet,
s"Set comparison failed\n" +
s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
s"Generated output (${output.size} items): ${output.mkString("\n")}"
)
} else {
assert(
output(i).toList === expectedOutput(i).toList,
s"Ordered list comparison failed\n" +
s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
s"Generated output (${output.size} items): ${output.mkString("\n")}"
)
}
}
logInfo("Output verified successfully")
}

/**
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
useSet: Boolean = false
) {
testOperation[U, V](input, operation, expectedOutput, -1, useSet)
}

/**
* Test unary DStream operation with a list of inputs
* @param input Sequence of input collections
* @param operation Binary DStream operation to be applied to the 2 inputs
* @param expectedOutput Sequence of expected output collections
* @param numBatches Number of batches to run the operation for
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
numBatches: Int,
useSet: Boolean
) {
val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
withStreamingContext(setupStreams[U, V](input, operation)) { ssc =>
val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
verifyOutput[V](output, expectedOutput, useSet)
}
}

/**
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
expectedOutput: Seq[Seq[W]],
useSet: Boolean
) {
testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
}

/**
* Test binary DStream operation with two lists of inputs
* @param input1 First sequence of input collections
* @param input2 Second sequence of input collections
* @param operation Binary DStream operation to be applied to the 2 inputs
* @param expectedOutput Sequence of expected output collections
* @param numBatches Number of batches to run the operation for
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
expectedOutput: Seq[Seq[W]],
numBatches: Int,
useSet: Boolean
) {
val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc =>
val output = runStreams[W](ssc, numBatches_, expectedOutput.size)
verifyOutput[W](output, expectedOutput, useSet)
}
}
}

最佳答案

这些是您应该检查的几件事-

  • 验证您是否有在 spark-config 中指定的可用资源
  • 搜索 停止()代码库中的关键字并检查它不应该在 sparkcontext
  • Spark 具有 Spark-UI 组件,您可以在其中查看运行的作业、失败或成功以及其日志。那会告诉你为什么它失败了。
  • 关于scala - 无法在已停止的 SparkContext 上调用方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36884845/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com