gpt4 book ai didi

apache-spark - 复杂类型的模式演化

转载 作者:行者123 更新时间:2023-12-04 15:46:34 24 4
gpt4 key购买 nike

spark 中 structs(复杂类型)的 arrays 的模式演化状态如何?

我知道对于常规简单类型的 ORC 或 Parquet 工作得很好(添加一个新列),但到目前为止我找不到任何适合我所需情况的文档。

我的用例是具有与此类似的结构:

user_id,date,[{event_time, foo, bar, baz, tag1, tag2, ... future_tag_n}, ...]

而且我希望能够向数组中的结构添加新字段。

Map(键值对)复杂类型会导致效率低下吗?我至少会确保添加新字段(标签)是灵活的。

编辑

case class BarFirst(baz:Int, foo:String)
case class BarSecond(baz:Int, foo:String, moreColumns:Int, oneMore:String)
case class BarSecondNullable(baz:Int, foo:String, moreColumns:Option[Int], oneMore:Option[String])
case class Foo(i:Int, date:String, events:Seq[BarFirst])
case class FooSecond(i:Int, date:String, events:Seq[BarSecond])
case class FooSecondNullable(i:Int, date:String, events:Seq[BarSecondNullable])
val dfInitial = Seq(Foo(1, "2019-01-01", Seq(BarFirst(1, "asdf")))).toDF
dfInitial.printSchema
dfInitial.show

root
|-- i: integer (nullable = false)
|-- date: string (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = false)
| | |-- foo: string (nullable = true)


scala> dfInitial.show
+---+----------+----------+
| i| date| events|
+---+----------+----------+
| 1|2019-01-01|[[1,asdf]]|
+---+----------+----------+

dfInitial.write.partitionBy("date").parquet("my_df.parquet")

tree my_df.parquet
my_df.parquet
├── _SUCCESS
└── date=2019-01-01
└── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet


val evolved = Seq(FooSecond(2, "2019-01-02", Seq(BarSecond(1, "asdf", 11, "oneMore")))).toDF
evolved.printSchema
evolved.show

scala> evolved.printSchema
root
|-- i: integer (nullable = false)
|-- date: string (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = false)
| | |-- foo: string (nullable = true)
| | |-- moreColumns: integer (nullable = false)
| | |-- oneMore: string (nullable = true)


scala> evolved.show
+---+----------+--------------------+
| i| date| events|
+---+----------+--------------------+
| 1|2019-01-02|[[1,asdf,11,oneMo...|
+---+----------+--------------------+

import org.apache.spark.sql._
evolved.write.mode(SaveMode.Append).partitionBy("date").parquet("my_df.parquet")
my_df.parquet
├── _SUCCESS
├── date=2019-01-01
│   └── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet
└── date=2019-01-02
└── part-00000-64e65d05-3f33-430e-af66-f1f82c23c155.c000.snappy.parquet

val df = spark.read.parquet("my_df.parquet")
df.printSchema
scala> df.printSchema
root
|-- i: integer (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = true)
| | |-- foo: string (nullable = true)
|-- date: date (nullable = true)

缺少其他列!为什么?

df.show
df.as[FooSecond].collect // AnalysisException: No such struct field moreColumns in baz, foo
df.as[FooSecondNullable].collect // AnalysisException: No such struct field moreColumns in baz, foo

此行为针对 spark 2.2.3_2.11 和 2.4.2_2.12 进行了评估。

最佳答案

在编辑后执行代码(上图)时,架构合并关闭,新列未加载。启用架构合并时:

val df = spark.read.option("mergeSchema", "true").parquet("my_df.parquet")
scala> df.printSchema
root
|-- i: integer (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = true)
| | |-- foo: string (nullable = true)
| | |-- moreColumns: integer (nullable = true)
| | |-- oneMore: string (nullable = true)
|-- date: date (nullable = true)

df.as[FooSecond].collect//显然失败 NullPointerException 必须使用选项df.as[FooSecondNullable].collect//工作正常

现在使用 hive

evolved.write.mode(SaveMode.Append).partitionBy("date").saveAsTable("my_df")

似乎工作正常(无一异常(exception)),但是当试图读回数据时:

spark.sql("describe my_df").show(false)
+-----------------------+---------------------------------+-------+
|col_name |data_type |comment|
+-----------------------+---------------------------------+-------+
|i |int |null |
|events |array<struct<baz:int,foo:string>>|null |
|date |string |null |
|# Partition Information| | |
|# col_name |data_type |comment|
|date |string |null |
+-----------------------+---------------------------------+-------+

当不使用仅使用基本类型的结构数组时:

val first = Seq(Foo(1, "2019-01-01")).toDF
first.printSchema
first.write.partitionBy("dt").saveAsTable("df")
val evolved = Seq(FooEvolved(1,2, "2019-01-02")).toDF
evolved.printSchema
evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
org.apache.spark.sql.AnalysisException: The column number of the existing table default.df(struct<first:int,dt:string>) doesn't match the data schema(struct<first:int,second:int,dt:string>);

有明确的错误信息问题:是否仍然可以在 Hive 中发展模式?还是需要手动调整架构?

结论

支持结构数组的模式演化,但在读取文件时必须打开合并选项,并且似乎只有在没有 Hive 的情况下直接读取文件时才能开箱即用。

当从 hive 中读取时,只返回旧模式,因为写入新列时似乎会被静默删除。

Schema evolution in parquet format (手动创建 View ,一个额外的好处是 parquet 不受支持的模式演变(重命名,数据类型更改是可能的))看起来是一个有趣的替代方案,因为 merge-schema 选项设置为 true 是非常耗费资源的,它适用于 Hadoop 上的所有 SQL 引擎。

关于apache-spark - 复杂类型的模式演化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55508402/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com