gpt4 book ai didi

scala - 如何在 foreachPartition 中使用 SQLContext 和 SparkContext

转载 作者:行者123 更新时间:2023-12-01 09:17:24 25 4
gpt4 key购买 nike

我想在 foreachPartition 中使用 SparkContext 和 SQLContext ,但由于序列化错误而无法执行。我知道这两个对象都不可序列化,但我认为 foreachPartition在 master 上执行,其中 Spark Context 和 SQLContext 都可用。

符号:

`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`

这是我当前的代码(UtilsDM 是 extends Serializable 的对象)。失败的代码部分从 val schema =...开始,我想写的地方 resultDataFrame然后将其保存到 Parquet。也许我组织代码的方式效率低下,那么我想在这里提供您的建议。谢谢。
// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)

// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
val producer = UtilsDM.createProducer
var df = UtilsDM.getDF
val result = iter.map{ msg =>
// ...
Seq(msg("key"),msg("value"))
}

// HERE I WANT TO WRITE result TO S3, BUT IT FAILS
val schema = StructType(
StructField("key", StringType, true) ::
StructField("value", StringType, true)

result.foreach { row =>
val rdd = sc.makeRDD(row)
val df2 = sqlContext.createDataFrame(rdd, schema)

// If the parquet file is not created, then create it
var df_final: DataFrame = null
if (df != null) {
df_final = df.unionAll(df2)
} else {
df_final = df2
}
df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
}
})

编辑:

我使用的是 Spark 1.6.2 和 Scala 2.10.6。

最佳答案

这不可能。 SparkContext , SQLContextSparkSession只能在驱动程序上使用。您可以在 foreachRDD 的顶层使用 sqlContext :

 myDStream.foreachRDD(rdd => {
val df = sqlContext.createDataFrame(rdd, schema)
...
})

你不能在转换/ Action 中使用它:
myDStream.foreachRDD(rdd => {
rdd.foreach {
val df = sqlContext.createDataFrame(...)
...
}
})

你可能想要相当于:
myDStream.foreachRDD(rdd => {
val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter))
val df = sqlContext.createDataFrame(foo, schema)
df.write.parquet("s3n://bucket/pathToSentMessages)
})

关于scala - 如何在 foreachPartition 中使用 SQLContext 和 SparkContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40691086/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com