gpt4 book ai didi

apache-spark - 如何在 Spark Streaming for Lookups 中创建到数据源的连接

转载 作者:IT王子 更新时间:2023-10-29 06:10:33 35 4
gpt4 key购买 nike

我有一个用例,我们正在流式传输事件,并且对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。 Spark 流将运行 40 个执行程序,我有 5 个这样的流作业都连接到同一个 Redis 集群。所以我很困惑我应该采用什么方法来创建 Redis 连接

  1. 在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量来做到这一点吗?

  2. 为每个分区创建一个 Redis 连接,但是我的代码是这样写的

    val update = xyz.transform(rdd => {
    //在驱动程序上
    如果(xyz.isNewDay){
    .....
    }
    RDD
    })
    update.foreachRDD(rdd => {
    rdd.foreachPartition(分区 => {
    partition.foreach(Key_trans => {
    //在这里执行一些查找逻辑
    }
    }
    })

现在,如果我在每个分区内创建一个连接,这意味着对于每个 RDD 以及该 RDD 中的每个分区,我都将创建一个新连接。

有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

如果需要,我可以添加更多上下文/信息。

最佳答案

1。在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量来做到这一点吗?

答案 - 不可以。由于与连接关联的机器相关数据,大多数连接对象不可序列化。

2。有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

Ans- 是的,创建一个连接池并在分区中使用它。这是风格。你可以像这样创建一个连接池 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

然后使用它

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

请检查: design pattern for using foreachRDD

关于apache-spark - 如何在 Spark Streaming for Lookups 中创建到数据源的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55190315/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com