gpt4 book ai didi

scala - 在完成 map 操作之前,Spark,mapPartitions,网络连接已关闭

转载 作者:行者123 更新时间:2023-12-03 00:14:55 26 4
gpt4 key购买 nike

我正在执行Spark作业,有时我想连接到 flex 搜索服务器以获取一些数据并将其添加到RDD。所以我正在使用的代码看起来像这样

 input.mapParitions(records=>{
val elcon=new ElasticSearchConnection
val client:TransportClient=elcon.openConnection()
val newRecs=records.flatMap(record=>{
val response = client.prepareGet("index" "indexType",
record.id.toString).execute().actionGet()
val newRec=processRec(record,reponse)
newRec
})//end of flatMap
client.close()
newRecs
})//end of mapPartitions

我的问题是,在 client.close()操作完成之前调用了 flatMap命令,这当然会导致异常。如果我在flatMap内移动连接的生成和关闭,该代码将起作用,但这将生成大量连接。是否可以确保flatMap操作完成后将调用 client.close

最佳答案

对RDD中的每个项目进行阻塞调用以获取相应的ElasticSearch文档,就是造成此问题的原因。通常建议避免阻塞 call 。

还有另一种使用ElasticSearch-for-Hadoop's Spark support的方法。

将ElasticSearch索引/类型读取为另一个RDD,并将其与RDD结合在一起。

包括正确的ESHadoop dependency版本。

import org.elasticsearch.spark._
val esRdd = sc.esRDD("index/indexType") //This returns a pair RDD of (_id, Map of all key value pairs for all fields]
input.map(record => (record.id, record)) //Convert your RDD of records to a pair rdd of (id, record) as we want to join based on the id
input.join(esRdd).map(rec => processResponse(rec._2._1, rec._2._2)) // Join the two RDDs based on id column it returns a pair RDD with key=id & value=Pair of matching records (id,(inputrddrecord,esrddrecord))

希望这可以帮助。

PS:它仍然不能缓解缺乏同一地点的问题。 (即,每个带有_id的文档都将来自索引的不同分片)。更好的方法是在创建ES索引时实现输入RDD和ES索引文档的共置。

关于scala - 在完成 map 操作之前,Spark,mapPartitions,网络连接已关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37070375/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com