gpt4 book ai didi

scala - mapWithState 断言失败 : Block rdd_45_0 is not locked for reading

转载 作者:行者123 更新时间:2023-12-05 07:41:26 27 4
gpt4 key购买 nike

我正在使用函数 mapWithState() 在我的 spark 流应用程序中计算 UV。在 mapWithState 之后,我得到一个 dstreamforeachRDD。在函数foreachRDD中,有一个rdd.foreachPartition foreach Iterator,然后用Future在Iterator上申请foreach,但是我在Future中得到了一个错误。

  • 错误日志在这里:
>     17/07/27 10:19:54.0447 INFO Executor: Finished task 1.0 in stage 52.0 (TID 422). 1878 bytes result sent to driver
> 17/07/27 10:19:54.0454 DEBUG BlockManagerSlaveEndpoint: removing RDD 47
> 17/07/27 10:19:54.0454 INFO BlockManager: Removing RDD 47
> 17/07/27 10:19:54.0455 DEBUG BlockManagerSlaveEndpoint: Done removing RDD 47, response is 0
> 17/07/27 10:19:54.0455 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to 192.168.1.30:43968
> 17/07/27 10:19:54.0456 DEBUG BlockManagerSlaveEndpoint: removing RDD 46
> 17/07/27 10:19:54.0456 INFO BlockManager: Removing RDD 46
> 17/07/27 10:19:54.0456 DEBUG BlockManagerSlaveEndpoint: Done removing RDD 46, response is 0
> 17/07/27 10:19:54.0456 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to 192.168.1.30:43968
> 17/07/27 10:19:54.0461 WARN BoneCP: Thread close connection monitoring has been enabled. This will negatively impact on your
> performance. Only enable this option for debugging purposes!
> 17/07/27 10:19:54.0873 WARN ClickAnalysis$: before parpair data with threadName=ForkJoinPool-1-worker-5 and threadId=46
> 17/07/27 10:19:54.0873 WARN ClickAnalysis$: before parpair data with threadName=ForkJoinPool-1-worker-3 and threadId=50
> 17/07/27 10:19:54.0875 WARN ClickAnalysis$: come into foreach data with threadName=ForkJoinPool-1-worker-5 and threadId=46
> 17/07/27 10:19:54.0875 WARN ClickAnalysis$: come into foreach data with threadName=ForkJoinPool-1-worker-3 and threadId=50
> Exception: java.util.concurrent.ExecutionException: Boxed Error
> at scala.concurrent.impl.Promise$.resolver(Promise.scala:55)
> at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244)
> at scala.concurrent.Promise$class.complete(Promise.scala:55)
> at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
> at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.AssertionError: assertion failed: Block rdd_45_0 is not locked for reading
> at scala.Predef$.assert(Predef.scala:170)
> at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
> at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
> at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
> at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
> at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at ClickAnalysis$.doPrepairCamAndGmtUvPs(ClickAnalysis.scala:383)
> at ClickAnalysis$$anonfun$8.apply(ClickAnalysis.scala:353)
> at ClickAnalysis$$anonfun$8.apply(ClickAnalysis.scala:345)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> ... 5 more
  • 这里是我的代码:

    val mapState3=pairs.mapWithState(StateSpec.function(mappingFunction).timeout(Duration(uvExpireTime.toLong))).map( x => (x._1, x._2.estimatedSize.toLong))

    mapState3.foreachRDD( { rdd =>{
    rdd.foreachPartition( uvRecord =>{
    if (!uvRecord.isEmpty) {

    doUpdateUV(uvRecord)

    }
    })
    def doUpdateUV(data:Iterator[(String, Long)]):Unit ={
    if(data != null){
    val f = Future{
    var connection:Connection = null
    try{
    connection = ConnectionPool.getConnection.getOrElse(null)
    connection.setAutoCommit(false)
    val camPs: PreparedStatement = connection.prepareStatement(updateUvCamCnt_sql)
    val gmtPs: PreparedStatement = connection.prepareStatement(updateUvGmtCnt_sql)
    logger.warn("before parpair data with threadName="+Thread.currentThread().getName+" and threadId="+Thread.currentThread().getId)
    for(uvRecord <- data) {
    logger.warn("come into foreach data with threadName=" + Thread.currentThread().getName + " and threadId=" + Thread.currentThread().getId)
    }
    logger.warn("come into batch update with threadName="+Thread.currentThread().getName+" and threadId="+Thread.currentThread().getId)
    camPs.executeBatch()
    gmtPs.executeBatch()
    connection.commit()
    camPs.close()
    gmtPs.close()
    } catch {
    case exception: Exception =>
    logger.error("Error in batchUpdate "+ exception.getMessage + "-----------------------" + ExceptionUtils.getStackTrace(exception) + "-----------------------------")
    throw exception
    } finally {
    ConnectionPool.closeConnection(connection)
    }
    "success"
    }
    f onSuccess {
    case result => println(s"Success: $result")
    }

    f onFailure {
    case t => println(s"Exception: ${ExceptionUtils.getStackTrace(t)}")
    }
    }

我期待为这个问题找到任何有用的解决方案。

最佳答案

我有同样的问题:

java.lang.AssertionError: assertion failed: Block rdd_xx_xx is notlocked for reading

我通过添加更多集群来修复它。这似乎是内存问题。

关于scala - mapWithState 断言失败 : Block rdd_45_0 is not locked for reading,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45341909/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com