gpt4 book ai didi

scala - 在 Spark (HDFS) 中写入 CSV 文件时选择哪个选项?

转载 作者:可可西里 更新时间:2023-11-01 15:23:30 25 4
gpt4 key购买 nike

我必须比较 CSV 文件,然后我必须删除所有重复的行。所以,我的情况就像我有一个文件夹,我必须将每个过滤结果放在该文件夹中,当一些新文件出现时,我必须将文件夹中的现有文件与新文件进行比较,最后,我必须把将结果返回到同一文件夹。

eg: /data/ingestion/file1.csv

a1 b1 c1

a2 b2 c2

a3 b3 c3

/data/ingestion/file2.csv

a4 b4 c4

a5 b5 c5

a6 b6 c6

new upcoming file(upcoming_file.csv):

a1 b1 c1

a5 b5 c5

a7 b7 c7

现在我的方法是从/data/ingestion/* 中存在的所有文件创建一个数据帧。然后创建 commoning_file.csv 的一个数据帧,并使用联合操作附加它们。最后,应用不同的转换。现在我必须将它写回/data/ingestion 以确保不会出现口是心非。所以,我选择覆盖操作。

deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion/")

然后我最终删除了文件夹/data/ingestion 中的所有内容。即使是新数据框也没有写入 CSV 文件。

我也尝试过其他选择,但我没有达到上面解释的效果!

提前致谢!

最佳答案

我建议将输出写入 hdfs 上的新目录 - 如果处理失败,您将始终能够丢弃任何已处理的内容并使用原始数据从头开始处理 - 它既安全又简单。 :)

处理完成后 - 只需删除旧的并将新的重命名为旧的名称。

更新:

deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion_tmp/")

Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");

Here是 HDFS 文件系统 API 文档的链接

关于scala - 在 Spark (HDFS) 中写入 CSV 文件时选择哪个选项?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50755135/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com