gpt4 book ai didi

scala - 在 Spark 中发现和读取多个文件

转载 作者:行者123 更新时间:2023-12-02 02:45:43 26 4
gpt4 key购买 nike

有不同的系统,它们有不同的文件集(txt,csv)要加载和转换并写入输出文件
使用 Apache Spark/Scala。
假设 SystemA 有 3 个文件,SystemB 在各自的目录中有 2 个文件。

FileType       |FileNames
-----------------------------------------
Customer |Customer_20190301.csv
Account |Account_20190301.csv
Order |Order_20190301.csv
OrderDetails |OrderDetails_20190301.txt
Transactions |Transactions_20190301.txt

现在我想根据作为输入给出的系统名称获取文件名和路径,以便我可以加载它们各自的系统文件。
我不想为每个系统创建单独的程序并加载它们的文件,因为文件名或路径将来可能会改变。

有没有一种有效的方法来处理这个问题?使用配置文件?
或者可能正在使用或不使用任何外部库?请指导我。

最佳答案

这个问题很适合采用分而治之的方法:

  • 描述系统的数量 + 参数化进一步处理所需的任何参数。你如何做到这一点取决于你的部署环境、选择的语言等。没有一个正确的答案。
  • 将(1)中的信息读入数据结构。
  • 使用(2)和(递归)目录列表的某种组合生成要处理的文件列表。请注意,给定路径,您可以使用 FileSystem.get(new java.net.URI(path), new Configuration()) 在 Spark 中获取 Hadoop 文件系统。 .
  • 按类型对文件进行分组。
  • 对于每个组,参数化一个 DataFrameReader来自 spark.read适本地并使用 .load(paths: _*) 调用负载的多路径版本.您可以通过将组名映射到返回 DataFrameReader 的函数来概括此代码。 .

  • 以下是如何执行 (5) 的示例:
    val readers: Map[String, SparkSession => DataFrameReader] = Map(
    "customer" -> ((spark: SparkSession) => spark.read.option("format", "csv"))
    )

    val groups: Map[String, Seq[String]] = ???

    groups.map { case (groupName, paths) =>
    readers(groupName)(spark).load(paths: _*)
    }

    关于scala - 在 Spark 中发现和读取多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55203493/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com