gpt4 book ai didi

scala - 使用 UDF 映射有条件地创建新列时出现 java.io.NotSerializedException : org. apache.spark.sql.Column

转载 作者:行者123 更新时间:2023-12-03 09:02:27 24 4
gpt4 key购买 nike

我有一个带有startTime的设备ID数据和一些特征向量,需要根据hourweekday_hour进行合并。样本数据如下:

+-----+-------------------+--------------------+
|hh_id| startTime| hash|
+-----+-------------------+--------------------+
|dev01|2016-10-10 00:01:04|(1048576,[121964,...|
|dev02|2016-10-10 00:17:45|(1048576,[121964,...|
|dev01|2016-10-10 00:18:01|(1048576,[121964,...|
|dev10|2016-10-10 00:19:48|(1048576,[121964,...|
|dev05|2016-10-10 00:20:00|(1048576,[121964,...|
|dev08|2016-10-10 00:45:13|(1048576,[121964,...|
|dev05|2016-10-10 00:56:25|(1048576,[121964,...|

这些功能基本上都是 SparseVector,它们是通过自定义函数合并的。当我尝试通过以下方式创建 key 列时:

val columnMap = Map("hour" -> hour($"startTime"), "weekday_hour" -> getWeekdayHourUDF($"startTime"))
val grouping = "hour"
val newDF = oldDF.withColumn("dt_key", columnMap(grouping))

我收到一个java.io.NotSerializedException。完整的堆栈跟踪如下:

Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: hour(startTime))
- field (class: scala.collection.immutable.Map$Map3, name: value1, type: class java.lang.Object)
- object (class scala.collection.immutable.Map$Map3, Map(hour -> hour(startTime), weekday_hour -> UDF(startTime), none -> 0))
- field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: groupingColumnMap, type: interface scala.collection.immutable.Map)
- object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@4f1f9a63)
- field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $iw, type: class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
- object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@207d6d1e)

但是,当我尝试使用 if-else 执行相同的逻辑而不显式创建列时,我不会遇到任何此类错误。

val newDF = if(groupingKey == "hour") {
oldDF.withColumn("dt_key", hour($"startTime")
} else {
oldDF.withColumn("dt_key", getWeekdayHourUDF($"startTime")
}

使用Map方式确实很方便,因为可能有更多类型的 key 提取方法。请帮我找出导致此问题的原因。

最佳答案

也许有点晚了,但我使用的是 Spark 2.4.6,无法重现该问题。我猜测代码会调用 columnMap 来获取多个键。如果您提供一个易于重现的示例,包括数据(1 行数据集就足够了),这会有所帮助。但是,正如堆栈跟踪所示,Column 类确实不是 Serialized,我将尝试根据我目前的理解进行详细说明。

TLDR;避免这种情况的一种简单方法是将 val 转换为 def


我相信,为什么用 when 案例或 UDF 表达同样的事情是有效的,这一点已经很清楚了。

第一次尝试:类似的东西可能不起作用的原因是(a)Column类不可序列化(我认为这是一个有意识的设计选择鉴于其在 Spark API 中的预期角色),并且 (b) 表达式中没有任何内容

oldDF.withColumn("dt_key", columnMap(grouping))

它告诉 Spark withColumn 的第二个参数的实际具体 Column 是什么,这意味着具体的 Map[String, Column]当引发此类异常时,需要通过网络将 code> 对象发送给执行器。

第二次尝试:第二次尝试之所以有效,是因为定义 DataFrame 所需的 groupingKey 参数可能会发生相同的决定完全取决于驱动程序。


考虑使用 DataFrame API 作为查询构建器的 Spark 代码,或者保存执行计划的东西,而不是数据本身,会有所帮助。一旦您对其调用操作(writeshowcount 等),Spark 就会生成将任务发送到执行器的代码。此时,实现 DataFrame/Dataset 所需的所有信息必须已在查询计划中正确编码,或者需要可序列化,以便可以通过网络。

def 通常可以解决此类问题,因为

def columnMap: Map[String, Column] = Map("a" -> hour($"startTime"), "weekday_hour" -> UDF($"startTime"))

不是具体的Map对象本身,而是每次调用时创建一个新的Map[String, Column]在每个执行器恰好执行涉及此Map的任务时。

Thisthis似乎是关于该主题的好资源。我承认我明白为什么使用像

这样的 Function
val columnMap = () => Map("a" -> hour($"startTime"), "b" -> UDF($"startTime"))

然后 columnMap()("a") 就可以工作了,因为反编译的字节代码显示 scala.Function 被定义为 Serialized,但我不明白为什么 def 有效,因为对于他们来说情况似乎并非如此。无论如何,我希望这会有所帮助。

关于scala - 使用 UDF 映射有条件地创建新列时出现 java.io.NotSerializedException : org. apache.spark.sql.Column,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49292921/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com