- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
目前 pyspark 格式化 logFile,然后加载 redshift。
分析输出的json格式的logFile的每一项,添加一项,加载到Redshift中。但是,某些项目的格式因类型而异。(对于同一项目,预先应用Shcema。)即使原样输出也会输入转义符。。有没有办法动态创建schema信息,输出的jsonfile没有转义符?
-- 环境--
- spark 2.4.0
- python version 2.7.15
-- 数据框 --
>> df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
>> df.show(2,False)
+------+------------------------------------------------------------+
|Name |d |
+------+------------------------------------------------------------+
|Amber |[Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1]|
|Alfred|[Body -> {"Weight": 80, "Height": 176}, BodyType -> 2] |
+------+------------------------------------------------------------+
-- Schema(普通项目)--
>> print(json.dumps(schema.jsonValue(), indent=2))
{
"fields": [
{
"metadata": {},
"type": "string",
"name": "Name",
"nullable": false
},
{
"metadata": {},
"type": {
"keyType": "string",
"type": "map",
"valueType": "string",
"valueContainsNull": true
},
"name": "d",
"nullable": false
}
],
"type": "struct"
}
-- 代码--
from pyspark.sql.types import *
rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)
-- 输出json文件--
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}
-- 输出json文件(理想)--
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}
我尝试使用 pyspark.sql.functions 的 schema_of_json() 和 from_json(),但没有成功。(schema_of_json 只能接受字符字面量)
-- 试用结果--
from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"
最佳答案
The short answer is no, there is no way to dynamically infer the schema on each row and end up with a column where different rows have different schemas.
但是,有一种方法可以输出您想要的 json 字符串,并将不同的 json 协调成一个通用的、丰富类型的模式
如果它被允许,它会非常慢,但更重要的是它是不允许的,因为它破坏了允许 SparkSQL 一致运行的关系模型。
数据框由列(字段)组成,列只有一种数据类型;数据类型代表整个列。鉴于 Python 的性质,它在 Pyspark 中不是严格执行的,但它在运行时很重要,因此该声明仍然适用。
在你的例子中,如果你想project City
属性,比如d.Body.City
,那么两者都必须存在阿尔弗雷德和安伯。至少,即使没有值,该字段的元数据也必须存在。执行引擎需要快速知道路径是否无效,避免对每一行进行无意义的扫描。
在单个列中协调多个类型的一些方法是(我敢肯定还有更多我想不到的方法):
在这种情况下,我喜欢 (1),但 (4) 作为寻找通用模式的临时步骤可能是有效的。
您的示例“通用”json 架构更像是选项 (3)。在您称为“d”的 map 中(我猜是因为它是字典?)如果不扫描数据,有关字段的信息将不可用。
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
我意识到这只是添加一个包含 Body
的新列的临时步骤,但要做到这一点,您必须将该映射中所有可能的键枚举到更有用的模式中。
通用(通用)模式不是string -> string
的通用映射,我认为像下面这样更有用。它接近您最初尝试的但不是动态的并且对两行都有效。注意 nullable
是所有属性的默认 True
schema_body = StructType([
StructField("City", StringType()),
StructField("Country", StringType()),
StructField("Weight", IntegerType()),
StructField("Height", IntegerType())
])
df = df.withColumn("Body", from_json("d.Body", schema_body))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
df.show(2, False)
+------+---------------------------------------------------------------+---------------------+
|Name |d |Body |
+------+---------------------------------------------------------------+---------------------+
|Amber |Map(Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1)|[Oregon,US,null,null]|
|Alfred|Map(Body -> {"Weight": 80, "Height": 176}, BodyType -> 2) |[null,null,80,176] |
+------+---------------------------------------------------------------+---------------------+
现在您可以通过选择 d.Body.City
轻松到达 Body.City
,而不必担心哪些行包含 City。
对于下一步,您可以将其恢复为 json 字符串
df = df.withColumn("Body", to_json("d.Body"))
你也可以结合上一步
df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyAttributes: struct (nullable = true)
| |-- Body: string (nullable = true)
| |-- BodyType: integer (nullable = true)
|-- Body: string (nullable = true)
df.show(2, False)
+------+---------------------------------------+--------------------------------+
|Name |BodyAttributes |Body |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2] |{"Weight":80,"Height":176} |
+------+---------------------------------------+--------------------------------+
请注意,将其转换回 json 字符串时,那些 NULL 值将消失。它现在也是一个 jsonstring,很容易根据需要写入文件。
如果您这样做是为了使数据可用于分析、报告或其他目的,我会做这样的事情
schema = StructType([
StructField('Name',StringType(), False),
StructField(
'd',
StructType([
StructField("Body", StringType()),
StructField("BodyType", IntegerType())
])
)
])
df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
"Body",
from_json("d.Body", schema_body)
).withColumn(
"BodyType",
col("d.BodyType")
).drop("d")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
|-- BodyType: integer (nullable = true)
df.show(2, False)
+------+---------------------+--------+
|Name |Body |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1 |
|Alfred|[null,null,80,176] |2 |
+------+---------------------+--------+
然后你可以选择Body.City
, Body.Country
, Body.Weight,
Body.Height`
您可以再走一步,但这实际上取决于这些可能的 Body 键有多少以及它有多稀疏。
df = df.withColumn(
"City", col("Body.City")
).withColumn(
"Country", col("Body.Country")
).withColumn(
"Weight", col("Body.Weight")
).withColumn(
"Height", col("Body.Height")
).drop("Body")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyType: integer (nullable = true)
|-- City: string (nullable = true)
|-- Country: string (nullable = true)
|-- Weight: integer (nullable = true)
|-- Height: integer (nullable = true)
df.show(2, False)
+------+--------+------+-------+------+------+
|Name |BodyType|City |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1 |Oregon|US |null |null |
|Alfred|2 |null |null |80 |176 |
+------+--------+------+-------+------+------+
关于python - 有没有一种方法可以使用 pyspark 动态创建模式信息,而不是在输出 jsonfile 中转义字符?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53955289/
我需要将文本放在 中在一个 Div 中,在另一个 Div 中,在另一个 Div 中。所以这是它的样子: #document Change PIN
奇怪的事情发生了。 我有一个基本的 html 代码。 html,头部, body 。(因为我收到了一些反对票,这里是完整的代码) 这是我的CSS: html { backgroun
我正在尝试将 Assets 中的一组图像加载到 UICollectionview 中存在的 ImageView 中,但每当我运行应用程序时它都会显示错误。而且也没有显示图像。 我在ViewDidLoa
我需要根据带参数的 perl 脚本的输出更改一些环境变量。在 tcsh 中,我可以使用别名命令来评估 perl 脚本的输出。 tcsh: alias setsdk 'eval `/localhome/
我使用 Windows 身份验证创建了一个新的 Blazor(服务器端)应用程序,并使用 IIS Express 运行它。它将显示一条消息“Hello Domain\User!”来自右上方的以下 Ra
这是我的方法 void login(Event event);我想知道 Kotlin 中应该如何 最佳答案 在 Kotlin 中通配符运算符是 * 。它指示编译器它是未知的,但一旦知道,就不会有其他类
看下面的代码 for story in book if story.title.length < 140 - var story
我正在尝试用 C 语言学习字符串处理。我写了一个程序,它存储了一些音乐轨道,并帮助用户检查他/她想到的歌曲是否存在于存储的轨道中。这是通过要求用户输入一串字符来完成的。然后程序使用 strstr()
我正在学习 sscanf 并遇到如下格式字符串: sscanf("%[^:]:%[^*=]%*[*=]%n",a,b,&c); 我理解 %[^:] 部分意味着扫描直到遇到 ':' 并将其分配给 a。:
def char_check(x,y): if (str(x) in y or x.find(y) > -1) or (str(y) in x or y.find(x) > -1):
我有一种情况,我想将文本文件中的现有行包含到一个新 block 中。 line 1 line 2 line in block line 3 line 4 应该变成 line 1 line 2 line
我有一个新项目,我正在尝试设置 Django 调试工具栏。首先,我尝试了快速设置,它只涉及将 'debug_toolbar' 添加到我的已安装应用程序列表中。有了这个,当我转到我的根 URL 时,调试
在 Matlab 中,如果我有一个函数 f,例如签名是 f(a,b,c),我可以创建一个只有一个变量 b 的函数,它将使用固定的 a=a1 和 c=c1 调用 f: g = @(b) f(a1, b,
我不明白为什么 ForEach 中的元素之间有多余的垂直间距在 VStack 里面在 ScrollView 里面使用 GeometryReader 时渲染自定义水平分隔线。 Scrol
我想知道,是否有关于何时使用 session 和 cookie 的指南或最佳实践? 什么应该和什么不应该存储在其中?谢谢! 最佳答案 这些文档很好地了解了 session cookie 的安全问题以及
我在 scipy/numpy 中有一个 Nx3 矩阵,我想用它制作一个 3 维条形图,其中 X 轴和 Y 轴由矩阵的第一列和第二列的值、高度确定每个条形的 是矩阵中的第三列,条形的数量由 N 确定。
假设我用两种不同的方式初始化信号量 sem_init(&randomsem,0,1) sem_init(&randomsem,0,0) 现在, sem_wait(&randomsem) 在这两种情况下
我怀疑该值如何存储在“WORD”中,因为 PStr 包含实际输出。? 既然Pstr中存储的是小写到大写的字母,那么在printf中如何将其给出为“WORD”。有人可以吗?解释一下? #include
我有一个 3x3 数组: var my_array = [[0,1,2], [3,4,5], [6,7,8]]; 并想获得它的第一个 2
我意识到您可以使用如下方式轻松检查焦点: var hasFocus = true; $(window).blur(function(){ hasFocus = false; }); $(win
我是一名优秀的程序员,十分优秀!