gpt4 book ai didi

csv - 将 Spark 数据帧写为带有分区的 CSV

转载 作者:行者123 更新时间:2023-12-04 02:55:04 26 4
gpt4 key购买 nike

我正在尝试将 Spark 中的数据帧写入 HDFS 位置,我希望如果我添加 partitionBy符号 Spark 将创建分区
(类似于 Parquet 格式的书写)
文件夹形式

partition_column_name=partition_value

(即 partition_date=2016-05-03 )。为此,我运行了以下命令:

(df.write
.partitionBy('partition_date')
.mode('overwrite')
.format("com.databricks.spark.csv")
.save('/tmp/af_organic'))

但尚未创建分区文件夹
知道我该怎么做才能让 spark DF 自动创建这些文件夹吗?

谢谢,

最佳答案

Spark 2.0.0+ :

内置的 csv 格式支持开箱即用的分区,因此您应该能够简单地使用:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)

不包括任何额外的包 .

Spark < 2.0.0 :

此时此刻 (v1.4.0) spark-csv不支持 partitionBy (请参阅 databricks/spark-csv#123 )但您可以调整内置源来实现您想要的。

您可以尝试两种不同的方法。假设您的数据相对简单(没有复杂的字符串并且需要字符转义)并且或多或少看起来像这样:

df = sc.parallelize([
("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])

您可以手动准备写入值:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)

并使用 text 书写来源

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
.options(inferSchema="true")
.load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)

在更复杂的情况下,您可以尝试使用适当的 CSV 解析器以类似的方式预处理值,通过使用 UDF 或映射到 RDD,但它会显着更加昂贵。

如果 CSV 格式不是硬性要求,您还可以使用支持 partitionBy 的 JSON 编写器盒子外面:

df.write.partitionBy("k").json("/tmp/bar")

以及读取时的分区发现。

关于csv - 将 Spark 数据帧写为带有分区的 CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37509932/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com