gpt4 book ai didi

python - 有没有一种方法可以使用 pyspark 动态创建模式信息,而不是在输出 jsonfile 中转义字符?

转载 作者:太空狗 更新时间:2023-10-30 01:36:07 25 4
gpt4 key购买 nike

目前 pyspark 格式化 logFile,然后加载 redshift。

分析输出的json格式的logFile的每一项,添加一项,加载到Redshift中。但是,某些项目的格式因类型而异。(对于同一项目,预先应用Shcema。)即使原样输出也会输入转义符。。有没有办法动态创建schema信息,输出的jsonfile没有转义符?

-- 环境--

- spark 2.4.0
- python version 2.7.15

-- 数据框 --

>> df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)

>> df.show(2,False)
+------+------------------------------------------------------------+
|Name |d |
+------+------------------------------------------------------------+
|Amber |[Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1]|
|Alfred|[Body -> {"Weight": 80, "Height": 176}, BodyType -> 2] |
+------+------------------------------------------------------------+

-- Schema(普通项目)--

>> print(json.dumps(schema.jsonValue(), indent=2))
{
"fields": [
{
"metadata": {},
"type": "string",
"name": "Name",
"nullable": false
},
{
"metadata": {},
"type": {
"keyType": "string",
"type": "map",
"valueType": "string",
"valueContainsNull": true
},
"name": "d",
"nullable": false
}
],
"type": "struct"
}

-- 代码--

from pyspark.sql.types import *

rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)

-- 输出json文件--

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}

-- 输出json文件(理想)--

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}

我尝试使用 pyspark.sql.functions 的 schema_of_json() 和 from_json(),但没有成功。(schema_of_json 只能接受字符字面量)

-- 试用结果--

from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"

最佳答案

tl;dr

The short answer is no, there is no way to dynamically infer the schema on each row and end up with a column where different rows have different schemas.

但是,有一种方法可以输出您想要的 json 字符串,并将不同的 json 协调成一个通用的、丰富类型的模式

详情

如果它被允许,它会非常慢,但更重要的是它是不允许的,因为它破坏了允许 SparkSQL 一致运行的关系模型。

数据框由列(字段)组成,列只有一种数据类型;数据类型代表整个列。鉴于 Python 的性质,它在 Pyspark 中不是严格执行的,但它在运行时很重要,因此该声明仍然适用。

在你的例子中,如果你想project City 属性,比如d.Body.City,那么两者都必须存在阿尔弗雷德和安伯。至少,即使没有值,该字段的元数据也必须存在。执行引擎需要快速知道路径是否无效,避免对每一行进行无意义的扫描。

在单个列中协调多个类型的一些方法是(我敢肯定还有更多我想不到的方法):

  1. 使用变体/联合/选项类型(例如,将所有常见和不常见的 json 模式联合在一起)
  2. 将它序列化为某种东西,例如 json 字符串(这是您在应用 jsonschema 之前开始的地方,非常适合传输数据,但不适合分析)
  3. 将其向上转换为具有最低公分母行为/接口(interface)/属性(丢失元数据和子类型的属性)的父类(super class)型、盒装或通用对象(如在 RDD 中)
  4. 不要将其存储为单一类型,例如将不同的变体存储到不同的列中,并在每个列上使用不同的 json 模式

在这种情况下,我喜欢 (1),但 (4) 作为寻找通用模式的临时步骤可能是有效的。

您的示例“通用”json 架构更像是选项 (3)。在您称为“d”的 map 中(我猜是因为它是字典?)如果不扫描数据,有关字段的信息将不可用。

root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)

我意识到这只是添加一个包含 Body 的新列的临时步骤,但要做到这一点,您必须将该映射中所有可能的键枚举到更有用的模式中。

解决方案

通用(通用)模式不是string -> string 的通用映射,我认为像下面这样更有用。它接近您最初尝试的但不是动态的并且对两行都有效。注意 nullable 是所有属性的默认 True

schema_body = StructType([
StructField("City", StringType()),
StructField("Country", StringType()),
StructField("Weight", IntegerType()),
StructField("Height", IntegerType())
])

df = df.withColumn("Body", from_json("d.Body", schema_body))
df.printSchema()

root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)


df.show(2, False)

+------+---------------------------------------------------------------+---------------------+
|Name |d |Body |
+------+---------------------------------------------------------------+---------------------+
|Amber |Map(Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1)|[Oregon,US,null,null]|
|Alfred|Map(Body -> {"Weight": 80, "Height": 176}, BodyType -> 2) |[null,null,80,176] |
+------+---------------------------------------------------------------+---------------------+

现在您可以通过选择 d.Body.City 轻松到达 Body.City,而不必担心哪些行包含 City。

对于下一步,您可以将其恢复为 json 字符串

df = df.withColumn("Body", to_json("d.Body"))

你也可以结合上一步

df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyAttributes: struct (nullable = true)
| |-- Body: string (nullable = true)
| |-- BodyType: integer (nullable = true)
|-- Body: string (nullable = true)

df.show(2, False)

+------+---------------------------------------+--------------------------------+
|Name |BodyAttributes |Body |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2] |{"Weight":80,"Height":176} |
+------+---------------------------------------+--------------------------------+

请注意,将其转换回 json 字符串时,那些 NULL 值将消失。它现在也是一个 jsonstring,很容易根据需要写入文件。


更进一步

如果您这样做是为了使数据可用于分析、报告或其他目的,我会做这样的事情

schema = StructType([
StructField('Name',StringType(), False),
StructField(
'd',
StructType([
StructField("Body", StringType()),
StructField("BodyType", IntegerType())
])
)
])

df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
"Body",
from_json("d.Body", schema_body)
).withColumn(
"BodyType",
col("d.BodyType")
).drop("d")

df.printSchema()

root
|-- Name: string (nullable = false)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
|-- BodyType: integer (nullable = true)


df.show(2, False)

+------+---------------------+--------+
|Name |Body |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1 |
|Alfred|[null,null,80,176] |2 |
+------+---------------------+--------+

然后你可以选择Body.City, Body.Country, Body.Weight,Body.Height`

您可以再走一步,但这实际上取决于这些可能的 Body 键有多少以及它有多稀疏。

df = df.withColumn(
"City", col("Body.City")
).withColumn(
"Country", col("Body.Country")
).withColumn(
"Weight", col("Body.Weight")
).withColumn(
"Height", col("Body.Height")
).drop("Body")

df.printSchema()

root
|-- Name: string (nullable = false)
|-- BodyType: integer (nullable = true)
|-- City: string (nullable = true)
|-- Country: string (nullable = true)
|-- Weight: integer (nullable = true)
|-- Height: integer (nullable = true)

df.show(2, False)

+------+--------+------+-------+------+------+
|Name |BodyType|City |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1 |Oregon|US |null |null |
|Alfred|2 |null |null |80 |176 |
+------+--------+------+-------+------+------+

关于python - 有没有一种方法可以使用 pyspark 动态创建模式信息,而不是在输出 jsonfile 中转义字符?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53955289/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com