gpt4 book ai didi

apache-spark - PySpark:无法使用日期时间年 = 0001 进行列操作

转载 作者:行者123 更新时间:2023-12-05 06:33:10 30 4
gpt4 key购买 nike

我有一些时间戳格式为“0001-mm-dd HH:MM:SS”的数据。我正在尝试获得最短时间。为了获得最短时间,我需要先转换为 DoubleType,因为 PySpark 数据帧的最小函数显然不适用于时间戳。然而,出于某种原因,datetimes 讨厌 0001 年。无论我做什么,我都无法让它工作。下面,我尝试使用 UDF 手动将年份增加 1,但由于某种原因,它没有注册。但是,我可以使用没有 0001 年的不同数据列,并将函数中的 if 语句更改为数据中包含的年份,我可以观察到年份的变化。

我做错了什么?

from pyspark.sql import SQLContext
import pyspark.sql.functions as sfunc
import pyspark.sql.types as tp
from pyspark import SparkConf
from dateutil.relativedelta import relativedelta

columnname='x'
#columnname='y'
tmpdf.select(columnname).show(5)

def timeyearonecheck(date):
'''Datetimes breaks down at year = 0001, so bump up the year to 0002'''
if date.year == 1:
newdate=date+relativedelta(years=1)
return newdate
else:
return date

def timeConverter(timestamp):
'''Takes either a TimestampType() or a DateType() and converts it into a
float'''
timetuple=timestamp.timetuple()
if type(timestamp) == datetime.date:
timevalue=time.mktime(timetuple)
return int(timevalue)
else:
timevalue=time.mktime(timetuple)+timestamp.microsecond/1000000
return timevalue

tmptimedf1colname='tmpyeartime'
yearoneudf=sfunc.udf(timeyearonecheck,tp.TimestampType())
tmptimedf1=tmpdf.select(yearoneudf(sfunc.col(columnname)).alias(tmptimedf1colname))
tmptimedf2colname='numbertime'
timeudf=sfunc.udf(timeConverter,tp.DoubleType())
tmptimedf2=tmptimedf1.select(timeudf(sfunc.col(tmptimedf1colname)).alias(tmptimedf2colname))
minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]


+-------------------+
| x|
+-------------------+
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
+-------------------+
only showing top 5 rows

Py4JJavaError Traceback (most recent call last)
<ipython-input-42-b5725bf01860> in <module>()
17 timeudf=sfunc.udf(timeConverter,tp.DoubleType())
18
tmptimedf2=tmpdf.select(timeudf(sfunc.col(columnname)).
alias(tmptimedf2colname))
---> 19 minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]
20 print(minimum)
...
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 43.0 failed 4 times, most recent failure: Lost task 3.3 in stage
43.0 (TID 7829, 10.10.12.41, executor 39):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
ValueError: year 0 is out of range

即使我只是尝试查看第一个 UDF 的输出,也会出现错误,但只有在查看输出时才会出现错误,而不是在实际计算时出现错误。

tmptimedf1.select(tmptimedf1colname).show(5)

Py4JJavaError Traceback (most recent call last)
<ipython-input-44-5fc942678065> in <module>()
----> 1 tmptimedf1.select(tmptimedf1colname).show(5)
...
Py4JJavaError: An error occurred while calling o2215.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage
44.0 (TID 7984, 10.10.12.36, executor 4):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
...
ValueError: year 0 is out of range

此外,如果我这样做,我会得到同样的 ValueError 谈论第 0 年:

tmpdf.select(columnname).first()

但前提是我使用年份为 0001 的列,而不是没有 0001 年的“y”列。 “y”列工作正常。

我不明白为什么我可以为 tmpdf 显示 5 个值,其中包括 0001,但我不能选择第一个值,因为它有 0001。

编辑:如下所述,我真的很想将 0001 年转换为 0002 年,因为 PySpark 的 approxQuantile 不适用于时间戳,而且一般来说,我不太了解数据集,无法知道是哪一年是可以接受的。 0001 绝对是填充年,但 1970 年可能是我数据中的真实年份(对于我的工作而言是一般情况)。

到目前为止我已经得到了这个:

def tmpfunc(timestamp):
time=datetime.datetime.strptime(timestamp,'%Y-%m-%d %H:%M:%S')
return time

adf=datadf.select(sfunc.col(columnname).cast("string").alias('a'))
newdf = adf.withColumn('b',sfunc.regexp_replace('a', '0001-', '0002-'))
newdf.show(10)
print(newdf.first())
tmpudf=sfunc.udf(tmpfunc,tp.TimestampType())
newnewdf=newdf.select(tmpudf(sfunc.col('b')).alias('c'))
newnewdf.show(10)
print(newnewdf.first())

+-------------------+-------------------+
| a| b|
+-------------------+-------------------+
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2015-10-13 09:56:09|2015-10-13 09:56:09|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2013-11-05 21:28:09|2013-11-05 21:28:09|
|1993-12-24 03:52:47|1993-12-24 03:52:47|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
+-------------------+-------------------+
only showing top 10 rows

Row(a='0001-01-02 00:00:00', b='0002-01-02 00:00:00')
+-------------------+
| c|
+-------------------+
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|2015-10-13 09:56:09|
|0002-01-03 23:56:02|
|2013-11-05 21:28:09|
|1993-12-24 03:52:47|
|0002-01-03 23:56:02|
+-------------------+
only showing top 10 rows

Row(c=datetime.datetime(2, 1, 2, 0, 0))

正如用户在下面评论的那样,“节目”中的天数是 1 天 23 小时 56 分钟 2 秒。为什么,我该如何摆脱它?那么为什么我的“第一次”调用是正确的,但在应该是 (2,1,2,0,0,0) 的地方也少了一个 0?

最佳答案

In order to get the minimum time, I need to convert to a DoubleType first because the minimum function for PySpark dataframes apparently doesn't work for timestapms.

确实如此

df = spark.createDataFrame(
["0001-01-02 00:00:00", "0001-01-03 00:00:00"], "string"
).selectExpr("to_timestamp(value) AS x")

min_max_df = df.select(sfunc.min("x"), sfunc.max("x"))
min_max_df.show()
# +-------------------+-------------------+
# | min(x)| max(x)|
# +-------------------+-------------------+
# |0001-01-02 00:00:00|0001-01-03 00:00:00|
# +-------------------+-------------------+

失败的部分实际上是转换为本地值:

>>> min_max_df.first()
Traceback (most recent call last):
...
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
ValueError: year 0 is out of range

最小值的纪元时间戳是

>>> df.select(sfunc.col("x").cast("long")).first().x
-62135683200

转换回日期时似乎向后移动了 2 天(Scala 代码):

scala> java.time.Instant.ofEpochSecond(-62135683200L)
res0: java.time.Instant = 0000-12-31T00:00:00Z

因此在 Python 中不再有效。

假设 0001 只是一个占位符,您可以在解析时忽略它:

df.select(sfunc.to_timestamp(
sfunc.col("x").cast("string"),
"0001-MM-dd HH:mm:ss").alias("x")
)).select(
sfunc.min("x"),
sfunc.max("x")
).first()
# Row(min(x)=datetime.datetime(1970, 1, 2, 1, 0), max(x)=datetime.datetime(1970, 1, 3, 1, 0))

您也可以将结果直接转换为字符串:

df.select(sfunc.min("x").cast("string"), sfunc.max("x").cast("string")).first()
# Row(CAST(min(x) AS STRING)='0001-01-02 00:00:00', CAST(max(x) AS STRING)='0001-01-03 00:00:00')

关于apache-spark - PySpark:无法使用日期时间年 = 0001 进行列操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50885719/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com