gpt4 book ai didi

scala - 将 Scala 案例类转移到 rdd.map func 中的 JsValue 但任务不可序列化

转载 作者:行者123 更新时间:2023-12-02 22:21:15 25 4
gpt4 key购买 nike

我是 Scala/Spark 的新手,我有案例类的 RDD

case class Info(key1 : String, key2 : String, key3 : String)

我想将 RDD[Info] 传输到 RDD[JsString] 并将其保存到 ElasticSearch,我使用 play.api.libs 并定义写入转换器:
implicit val InfoWrites = new Writes[Info]{
def writes(i : Info): JsObject = Json.obj(
"key1" -> i.key1,
"key2" -> i.key2,
"key3" -> i.key3
)
}

然后我定义隐式类来使用 save func:
implicit class Saver(rdd : RDD[Info]) {
def save() : Unit = {
rdd.map{ i => Json.toJson(i).toString }.saveJsonToEs("resource"))
}
}

所以我可以保存 RDD[Info]
infoRDD.save()

但我一直收到 “任务不可序列化”错误 rdd.map() 中的 Json.toJson()

我也尝试像这样定义可序列化对象
object jsonUtils extends Serializable{
def toJsString(i : Info) : String = {
Json.toJson(i).toString()
}
}
rdd.map{ i => jsonUtils.toJsString(i) }

但不断收到错误“任务不可序列化”
如何更改代码?谢谢 !

最佳答案

我运行了下面的代码,类似于你的代码,它对我有用:

import models.Info
import org.apache.spark.rdd.RDD
import play.api.libs.json.Json
import domain.utils.Implicits._

class CustomFunctions(rdd : RDD[Info]) {
def save() = {
rdd.map(i => Json.toJson(i).toString ).saveAsTextFile("/home/training/so-123")
}

}

写了对应的 Implicits :
package domain.utils

import play.api.libs.json.{JsObject, Json, Writes}
import models.Info

class Implicits {
implicit val InfoWrites = new Writes[Info]{
def writes(i : Info): JsObject = Json.obj(
"key1" -> i.key1,
"key2" -> i.key2,
"key3" -> i.key3
)
}

}

object Implicits extends Implicits

创建模型 Info :
package models

case class Info(key1 : String, key2 : String, key3 : String)

创建了一个 SparkOperationsDao组合和创建 Spark 上下文:
package dao

import domain.utils.CustomFunctions
import models.Info
import org.apache.spark.{SparkConf, SparkContext}


class SparkOperationsDao {
val conf:SparkConf = new SparkConf().setAppName("driverTrack").setMaster("local")
val sc = new SparkContext(conf)

def writeToElastic() = {
val sample = List(Info("name1", "city1", "123"), Info("name2", "city2", "234"))
val rdd = sc.parallelize(sample)
val converter = new CustomFunctions(rdd)
converter.save()
}

}

object SparkOperationsDao extends SparkOperationsDao

运行应用程序:
import dao.SparkOperationsDao

object RapidTests extends App {
SparkOperationsDao.writeToElastic()
//.collect.foreach(println)

}

关于scala - 将 Scala 案例类转移到 rdd.map func 中的 JsValue 但任务不可序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62314247/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com