gpt4 book ai didi

json - 在 pyspark 中读取嵌套的 JSON 文件

转载 作者:行者123 更新时间:2023-12-03 15:49:10 27 4
gpt4 key购买 nike

我想从 hdfs 中的 json 文件创建一个 pyspark 数据框。

json 文件具有以下内容:

{ "Product": { "0": "Desktop Computer", "1": "Tablet", "2": "iPhone", "3": "Laptop" }, "Price": { "0": 700, "1": 250, "2": 800, "3": 1200 } }



然后,我使用 pyspark 2.4.4 df = spark.read.json("/path/file.json") 读取此文件

所以,我得到这样的结果:
df.show(truncate=False)
+---------------------+---------------------------------+
|Price |Product |
+---------------------+---------------------------------+
|[700, 250, 800, 1200]|[Desktop, Tablet, Iphone, Laptop]|
+---------------------+---------------------------------+

但我想要一个具有以下结构的数据框:
+-------+--------+
|Price |Product |
+-------+--------+
|700 |Desktop |
|250 |Tablet |
|800 |Iphone |
|1200 |Laptop |
+-------+--------+

如何使用 pyspark 获取具有先前结构的数据框?

我试着用explode df.select(explode("Price"))但我收到以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:

Py4JJavaError: An error occurred while calling o688.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;
'Project [explode(Price#107) AS List()]
+- LogicalRDD [Price#107, Product#108], false

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:97)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1312)
at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException Traceback (most recent call last)
<ipython-input-46-463397adf153> in <module>
----> 1 df.select(explode("Price"))

/usr/lib/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
1200 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
1201 """
-> 1202 jdf = self._jdf.select(self._jcols(*cols))
1203 return DataFrame(jdf, self.sql_ctx)
1204

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;\n'Project [explode(Price#107) AS List()]\n+- LogicalRDD [Price#107, Product#108], false\n"

最佳答案

重新创建您的数据帧:

from pyspark.sql import functions as F

df = spark.read.json("./row.json")
df.printSchema()
#root
# |-- Price: struct (nullable = true)
# | |-- 0: long (nullable = true)
# | |-- 1: long (nullable = true)
# | |-- 2: long (nullable = true)
# | |-- 3: long (nullable = true)
# |-- Product: struct (nullable = true)
# | |-- 0: string (nullable = true)
# | |-- 1: string (nullable = true)
# | |-- 2: string (nullable = true)
# | |-- 3: string (nullable = true)

如上图 printSchema输出,您的 PriceProduct列是 struct s。因此 explode不会工作,因为它需要 ArrayTypeMapType .

首先,转换 struct转至 arrays使用 .*Querying Spark SQL DataFrame with complex types 中所示的符号:
df = df.select(
F.array(F.expr("Price.*")).alias("Price"),
F.array(F.expr("Product.*")).alias("Product")
)

df.printSchema()

#root
# |-- Price: array (nullable = false)
# | |-- element: long (containsNull = true)
# |-- Product: array (nullable = false)
# | |-- element: string (containsNull = true)

现在因为您正在使用 Spark 2.4+ ,您可以使用 arrays_zip 压缩 PriceProduct在使用 explode 之前将数组放在一起:
df.withColumn("price_product", F.explode(F.arrays_zip("Price", "Product")))\
.select("price_product.Price", "price_product.Product")\
.show()

#+-----+----------------+
#|Price| Product|
#+-----+----------------+
#| 700|Desktop Computer|
#| 250| Tablet|
#| 800| iPhone|
#| 1200| Laptop|
#+-----+----------------+

对于旧版本的 Spark,在 arrays_zip 之前,您可以分别分解每一列并将结果重新连接在一起:
df1 = df\
.withColumn("price_map", F.explode("Price"))\
.withColumn("id", F.monotonically_increasing_id())\
.drop("Price", "Product")

df2 = df\
.withColumn("product_map", F.explode("Product"))\
.withColumn("id", F.monotonically_increasing_id())\
.drop("Price", "Product")

df3 = df1.join(df2, "id", "outer").drop("id")

df3.show()

#+---------+----------------+
#|price_map| product_map|
#+---------+----------------+
#| 700|Desktop Computer|
#| 250| Tablet|
#| 1200| Laptop|
#| 800| iPhone|
#+---------+----------------+

关于json - 在 pyspark 中读取嵌套的 JSON 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57811415/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com