gpt4 book ai didi

apache-spark - Spark 流和连接池实现

转载 作者:行者123 更新时间:2023-12-04 02:43:14 24 4
gpt4 key购买 nike

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams上的流媒体 Spark 网站提到以下代码:

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

我尝试使用org.apache.commons.pool2实现此功能,但是运行应用程序失败,并出现预期的java.io.NotSerializableException:
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
...

我想知道实现可序列化的连接池有多现实。有人成功做到了吗?

谢谢你。

最佳答案

为了解决此“本地资源”问题,需要的是一个单例对象-即保证在JVM中实例化一次且仅实例化一次的对象。幸运的是,Scala object提供了开箱即用的功能。

要考虑的第二件事是,该单例将为在托管它的同一JVM上运行的所有任务提供服务,因此,它必须照顾并发性和资源管理。

让我们尝试草绘(*)这样的服务:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
def release() = pool.returnObject(socket)
}

// singleton object
object SocketPool {
var hostPortPool:Map[(String, Int),ObjectPool] = Map()
sys.addShutdownHook{
hostPortPool.values.foreach{ // terminate each pool }
}

// factory method
def apply(host:String, port:String): ManagedSocket = {
val pool = hostPortPool.getOrElse{(host,port), {
val p = ??? // create new pool for (host, port)
hostPortPool += (host,port) -> p
p
}
new ManagedSocket(pool, pool.borrowObject)
}
}

然后用法变为:
val host = ???
val port = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val mSocket = SocketPool(host, port)
partition.foreach{elem =>
val os = mSocket.socket.getOutputStream()
// do stuff with os + elem
}
mSocket.release()
}
}

我假设问题中使用的 GenericObjectPool正在照顾并发性。否则,需要使用某种形式的同步来保护对每个 pool实例的访问。

(*)提供的代码来说明有关如何设计此类对象的想法-需要付出额外的努力才能转换为工作版本。

关于apache-spark - Spark 流和连接池实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30450763/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com