gpt4 book ai didi

scala - 函数到 Spark Dataframe 的每一行

转载 作者:行者123 更新时间:2023-12-01 13:22:33 25 4
gpt4 key购买 nike

我有一个带有 2 列(Report_idCluster_number)的 spark Dataframe (df)。

我想将一个函数 (getClusterInfo) 应用于 df,它将返回每个集群的名称,即如果集群编号为“3”,那么对于特定的 report_id,将写入下面提到的 3 行:

{"cluster_id":"1","influencers":[{"screenName":"A"},{"screenName":"B"},{"screenName":"C"},...]}
{"cluster_id":"2","influencers":[{"screenName":"D"},{"screenName":"E"},{"screenName":"F"},...]}
{"cluster_id":"3","influencers":[{"screenName":"G"},{"screenName":"H"},{"screenName":"E"},...]}

我在 df 上使用 foreach 来应用 getClusterInfo 函数,但不知道如何将 o/p 转换为 Dataframe (Report_id, Array[cluster_info]).

这里是代码片段:

  df.foreach(row => {
val report_id = row(0)
val cluster_no = row(1).toString
val cluster_numbers = new Range(0, cluster_no.toInt - 1, 1)
for (cluster <- cluster_numbers.by(1)) {
val cluster_id = report_id + "_" + cluster
//get cluster influencers
val result = getClusterInfo(cluster_id)
println(result.get)
val res : String = result.get.toString()
// TODO ?
}
.. //TODO ?
})

最佳答案

一般来说,当您想要将某些内容映射到其他内容时,您不应该使用foreachforeach 适用于应用只有副作用 且不返回任何内容的函数。

在这种情况下,如果我的细节正确(可能不是),您可以使用用户定义函数 (UDF) 并展开结果:

import org.apache.spark.sql.functions._
import spark.implicits._

// I'm assuming we have these case classes (or similar)
case class Influencer(screenName: String)
case class ClusterInfo(cluster_id: String, influencers: Array[Influencer])

// I'm assuming this method is supplied - with your own implementation
def getClusterInfo(clusterId: String): ClusterInfo =
ClusterInfo(clusterId, Array(Influencer(clusterId)))

// some sample data - assuming both columns are integers:
val df = Seq((222, 3), (333, 4)).toDF("Report_id", "Cluster_number")

// actual solution:

// UDF that returns an array of ClusterInfo;
// Array size is 'clusterNo', creates cluster id for each element and maps it to info
val clusterInfoUdf = udf { (clusterNo: Int, reportId: Int) =>
(1 to clusterNo).map(v => s"${reportId}_$v").map(getClusterInfo)
}

// apply UDF to each record and explode - to create one record per array item
val result = df.select(explode(clusterInfoUdf($"Cluster_number", $"Report_id")))

result.printSchema()
// root
// |-- col: struct (nullable = true)
// | |-- cluster_id: string (nullable = true)
// | |-- influencers: array (nullable = true)
// | | |-- element: struct (containsNull = true)
// | | | |-- screenName: string (nullable = true)

result.show(truncate = false)
// +-----------------------------+
// |col |
// +-----------------------------+
// |[222_1,WrappedArray([222_1])]|
// |[222_2,WrappedArray([222_2])]|
// |[222_3,WrappedArray([222_3])]|
// |[333_1,WrappedArray([333_1])]|
// |[333_2,WrappedArray([333_2])]|
// |[333_3,WrappedArray([333_3])]|
// |[333_4,WrappedArray([333_4])]|
// +-----------------------------+

关于scala - 函数到 Spark Dataframe 的每一行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49329406/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com