gpt4 book ai didi

scala - 如何序列化elastic4s ElasticSearch客户端以与Spark RDD一起运行?

转载 作者:行者123 更新时间:2023-11-30 09:22:01 25 4
gpt4 key购买 nike

目前,我正在数百万用户和产品上运行 Spark Mllib ALS,并且由于磁盘洗牌次数较多,因此与以下代码一样,收集步骤与RecommendProductsForUsers步骤相比需要更多时间。因此,如果我可以以某种方式删除收集步骤并将数据直接从执行器提供给Elasticsearch,那么将节省大量时间和计算资源。

import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.elasticsearch.common.settings.ImmutableSettings


val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MYCLUSTER").build()
val client = ElasticClient.remote(settings, "11.11.11.11", 9300)
var ESMap = Map[String, List[String]]()
val topKReco = bestModel.get
// below step take 3 hours
.recommendProductsForUsers(30)
// below step takes 6 hours
.collect()
.foreach { r =>
var i = 1
var curr_user = r._1
r._2.foreach { r2 =>
item_ids(r2.product))
ESMap += i.toString -> List(r2.product.toString)
i += 1
}
client.execute {
index into "recommendations1" / "items" id curr_user fields ESMap
}.await
}

所以现在当我在没有收集步骤的情况下运行此代码时,我收到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at CatalogALS2$.main(CatalogALS2.scala:157)
at CatalogALS2.main(CatalogALS2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.sksamuel.elastic4s.ElasticClient
Serialization stack:
- object not serializable (class: com.sksamuel.elastic4s.ElasticClient, value: com.sksamuel.elastic4s.ElasticClient@e4c4af)
- field (class: CatalogALS2$$anonfun$2, name: client$1, type: class com.sksamuel.elastic4s.ElasticClient)
- object (class CatalogALS2$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

所以我从中了解到的是,如果我可以以某种方式序列化 com.sksamuel.elastic4s.ElasticClient 类,那么我可以并行运行此任务,而无需将数据收集到驱动程序。如果我概括这个问题,那么如何序列化 scala 中的任何类或函数以在 RDD 上操作?

最佳答案

通过使用序列化找到相同的答案,例如:

object ESConnection extends Serializable {

// Elasticsearch Client intiation
val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MyCluster").build()
lazy val client = ElasticClient.remote(settings, "11.11.11.11", 9300)

}

然后您可以在执行器上通过 RDD 使用它,而无需实际将数据收集到驱动程序,如下所示:

   val topKReco = bestModel.get
.recommendProductsForUsers(30)
// no collect required now
.foreach { r =>
var i = 1
var curr_user = r._1

r._2.foreach { r2 =>
ESMap += i.toString -> List(r2.product.toString, item_ids(r2.product))
i += 1
}
ESConnection.client.execute {
index into "recommendation1" / "items" id curr_user fields ESMap
}.await

}

关于scala - 如何序列化elastic4s ElasticSearch客户端以与Spark RDD一起运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31933671/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com