gpt4 book ai didi

scala - 使用spark和scala将ListBuffer [List [Any]]值写入CSV

转载 作者:行者123 更新时间:2023-12-02 22:00:38 25 4
gpt4 key购买 nike

我现在重新提出了我的问题。

我正在学习scala和spark。我知道直接从csv文件创建RDD,而不是创建DF并将其转换为RDD。但是,我正在尝试以下组合。

创建scala ListBuffer,Spark Dataframe并将其转换为RDD:

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()

scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()

scala> var src_policy_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("sparktest/policy_details.csv")
src_policy_df: org.apache.spark.sql.DataFrame = [policy_id: int, otherdetails: string]

scala> var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)
src_rdd: org.apache.spark.rdd.RDD[List[Any]] = MapPartitionsRDD[40] at map at <console>:26

scala> var src_pol_list = src_rdd.collect.toList
src_pol_list: List[List[Any]] = List(List(10110000, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))

使用scala进行循环,我正在迭代Spark RDD记录以替换列值(带有surrogateId的policy_id),如下所示-
scala> for(pol_details <- src_pol_list){
| src_policy_final += pol_details.toList.map(e => if(e==10110000) 1345678 else e)
| }

我正在使用 .map(e => if(e==orig_pol_id) ref_surr_id else e)更改记录的特定列值,并将记录添加到 ListBuffer[List[Any]]。一旦完成迭代,将RDD中的所有记录扔掉,我将使用 ListBuffer[Lis[Any]]函数将 SaveAsTextFile("/sparktest/policy_details")值作为csv文件写入HDFS文件系统中

当我执行src_policy_final的println时,输出为:
    scala> println(src_policy_final)
ListBuffer(List(1345678, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))

现在,通过将ListBuffer [ListAny]]转换为RDD,将修改后的数据写回到HDFS文件系统:
scala> var src_write = sc.parallelize(src_policy_final.toList)
src_write: org.apache.spark.rdd.RDD[List[Any]] = ParallelCollectionRDD[43] at parallelize at <console>:53

写入HDFS文件系统:
scala> src_write.saveAsTextFile("sparktest/pol_det")

输出数据如下:
List(1345678, This is the first policy)
List(10456200, This is the second policy)
List(10345300, This is the third policy)

想要得到的输出是:
1345678, This is the first policy
10456200, This is the second policy
10345300, This is the third policy

我不确定如何根据我的要求加载输出。

希望,我已经对我要实现的目标给出了更好的解释。

最佳答案

我真的不明白你想做什么...
但是,由于您说的是学习,所以我将尝试逐步解释所有内容-希望对您有所帮助。

首先,作为几年前从Java转到Scala的同事的建议。尽可能避免所有变异,迫使自己以功能性方式进行思考和编程-因此,请使用val而不是var不可变集合,而不是可变的集合。

其次,尽可能避免使用Any类型的东西,例如在这里...

var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)

...您可以采用更类型化的方式从每个 Row中获取所需的值,例如:
val src_rdd = src_policy_df.rdd.map { row =>
(
row.getAs[Int](fieldName = "policy_id"),
row.getAs[String](fieldName = "otherdetails")
)
}
// src_rdd: RDD[(Int, String)]

甚至更好,请使用 Dataset(一个 类型的 DataFrame)。
import spark.implicits._ // spark is an instance of SparkSession
final case class Policy(policy_id: Int, otherdetails: String)
val src_dataset = src_policy_df.as[Policy] // implicit Encoder needed here, provided by the spark implicits.

在Spark中,您 不应该对数据进行 collect-除了作为计算管道的最后一步(而且在大多数情况下,此操作仅在调试阶段完成,因为通常将其保存到外部数据存储中,例如HDFS或mongo),或者如果您确定自己有一个小的 RDD,希望将其作为查找表或类似内容供其他转换访问(例如,这在缩减对RDD上非常常见,因此存在 reduceByKeyLocally方法返回 map )。
为什么? -因为 collect将在 执行器上分发的所有数据都带到 驱动程序,这意味着您不再使用该框架来并行化计算。
您应该做的是使用Spark提供的 转换来构建计算,例如 map
val orig_pol_id = 10110000
val ref_surr_id = 1345678

// Using RDDs.
val src_policy_final_rdd = src_rdd.map {
case (id, otherdetails) if (id == orig_pol_id) => (ref_surr_id, otherdetails)
case policy => policy // default case, nothing change.
}

// Using Datasets.
val src_policy_final_dataset = src_dataset.map {
case policy if (policy.id == orig_pol_id) => policy.copy(id = ref_surr_id) // the copy method returns a new policy with the provided fields changed.
case policy => policy // default case, nothing change.
}

最后,当将 RDD写入 HDFS 时,它将在每个元素上使用默认的 toString来打印每一行。因此,您可能需要先对其进行格式化,然后再保存。
val write_rdd = src_policy_final_rdd.map {
case (id, otherdetails) => s"$id,$otherdetails"
}
// wirte_rdd: RDD[String]
src_write.saveAsTextFile("sparktest/pol_det")

或者,如果您使用的是 Dataset,则可以使用 DataframeWriter api来为您处理所有这些操作。 (推荐的)
src_policy_final_dataset
.write
.option("header", "true")
.option("sep", ",") // ',' is the default separator, but I prefer to be specific.
.csv("sparktest/pol_det")

这应该解决您所有的问题。

PS:最后两点。
首先,这个问题通常在 SO 中被问到/回答不了,因此请尽量限制自己的范围,下次再说清楚。
而且,您可以尝试先阅读有关Spark的知识,并做一些快速教程以使自己对框架更加满意-顺便说一句, this是我几天前为办公室制作的简短Spark研讨会,旨在供非Scala开发人员使用,希望它也可以帮助您:)

关于scala - 使用spark和scala将ListBuffer [List [Any]]值写入CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53525139/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com