gpt4 book ai didi

apache-spark - 我可以获取 Spark 读取的文件的元数据吗

转载 作者:行者123 更新时间:2023-12-05 09:08:59 25 4
gpt4 key购买 nike

假设我们有 2 个文件,文件#1 在 12:55 创建,文件#2 在 12:58 创建。在阅读这两个文件时,我想添加一个新列“creation_time”。属于文件#1 的行在“creation_time”列中有 12:55,属于文件#2 的行在“creation_time”列中有 12:58。

new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")

我正在使用上面的代码片段来读取“输入”目录中的文件。

最佳答案

使用input_file_name()函数获取文件名,然后使用hdfs file api获取文件timestamp 最终加入 filename 上的两个数据帧。

示例:

from pyspark.sql.types import *
from pyspark.sql.functions import *
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())

status = fs.listStatus(Path('<hdfs_directory>'))

filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
withColumn("modified_time",to_timestamp(col("modified_time")))

input_df=spark.read.csv("<hdfs_directory>").\
withColumn("filename",input_file_name())

#join both dataframes on filename to get filetimestamp
df=input_df.join(filestatus_df,['filename'],"left")

关于apache-spark - 我可以获取 Spark 读取的文件的元数据吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62846669/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com