gpt4 book ai didi

scala - 使用 foreach 行在数据框中捕获和写入字符串

转载 作者:可可西里 更新时间:2023-11-01 16:24:19 25 4
gpt4 key购买 nike

在使用 Scala 替换从数据帧的每一行的特定字段中获取的内容后, try catch 并写入字符串值。但是由于它部署在集群上无法捕获任何记录。谁能提供解决方案?

假设 TEST_DB.finalresult 有 2 个字段 input1 和 input2:

val finalresult=spark.sql("select * from TEST_DB.finalresult")

finalResult.foreach { row =>
val param1=row.getAs("input1").asInstanceOf[String]
val param2=row.getAs("input2").asInstanceOf[String]

val string = """new values of param1 and param2 are -> """ + param1 + """,""" + param2
// how to append modified string to csv file continously for each microbatch in hdfs ??
}

最佳答案

在您的代码中,您创建了所需的 string 变量,但它没有保存在任何地方,因此您看不到结果。

您可能会在每次执行 foreach 时打开所需的 csv 文件并附加新字符串,但我想提出一个不同的解决方案。

如果可以,请尝试始终使用 Spark 的内置功能,因为它(通常)在处理空输入方面更加优化和更好。您可以通过以下方式实现相同的目的:

import org.apache.spark.sql.functions.{lit, concat, col}

val modifiedFinalResult = finalResult.select(
concat(
lit("new values of param1 and param2 are -> "),
col("input1"),
lit(","),
col("input2")
).alias("string")
)

在变量 modifiedFinalResult 中,您将拥有一个包含名为 string 的单列的 spark 数据框,它表示与您的变量 string 完全相同的输出你的代码。之后,您可以将数据帧直接保存为单个 csv 文件(使用重新分区功能):

modifiedFinalResult.repartition(1).write.format("csv").save("path/to/your/csv/output")

PS:也是对 future 的一个建议,尽量避免以数据类型命名变量。

UPDATE: Fixed the empty rows issue by using "concat_ws" instead of concat and coalesce to each fields. It seems some of the values which were null were transforming the entire concatenated string to null after the transformation. Nevertheless this solution works for now!

关于scala - 使用 foreach 行在数据框中捕获和写入字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56365224/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com