gpt4 book ai didi

java - UDF 仅从 Spark SQL 中的路径中提取文件名

转载 作者:搜寻专家 更新时间:2023-11-01 01:49:56 30 4
gpt4 key购买 nike

Apache Spark 中有一个 input_file_name 函数,我用它来将新列添加到数据集中,其中包含当前正在处理的文件的名称。

问题是我想以某种方式自定义此函数以仅返回文件名,而在 s3 上忽略它的完整路径。

目前,我正在使用 map 函数替换第二步中的路径:

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}

但我想用类似的东西

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)

最佳答案

在 Scala 中:

#register udf
spark.udf
.register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)

#use the udf to get last token(filename) in full path
val initialDs = spark.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name))

编辑:在 Java 中 根据评论

#register udf
spark.udf()
.register("get_only_file_name", (String fullPath) -> {
int lastIndex = fullPath.lastIndexOf("/");
return fullPath.substring(lastIndex, fullPath.length - 1);
}, DataTypes.StringType);

import org.apache.spark.sql.functions.input_file_name

#use the udf to get last token(filename) in full path
Dataset<Row> initialDs = spark.read()
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name()));

关于java - UDF 仅从 Spark SQL 中的路径中提取文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40848681/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com