- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在从 Spark 读取一个 dynamodb 表,该表的一个字段中有一个 JSON 字符串,其他字段中有字符串。我能够读取 JSON 字段但不能读取嵌套的 JSON 字段。这不是 query Json Column using dataframes 的重复.这个问题确实解释了如何从 JSON 字符串中提取列而不是嵌套的 JSON 列。
import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")
用户.show(1)
样本数据集
|col1 | ID | field2|field3|
-------------------------------------------------------------------------------------
|{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1 | X1 |Y1 |
我需要从 col1(JSON 结构)和 ID 字段中提取几个字段。我能够弄清楚如何解析 JSON 字段 (col1) 并按照说明从 col1 获取字段“c”here但无法提取嵌套字段。
我的代码:
val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")
data.show(1,false)
|a |c |ID|
---------------------------------------------------------
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1|
现在,当我尝试在上述数据框上应用相同的 get_json_object 时,我得到了所有空值。
val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)
|get_json_object(a, '$.b')| c | ID|
------------------------------------
|null |valC|A1 |
我也试过爆炸,因为 col 'a' 有数组和结构。但这也不起作用,因为数据框“data”正在将 col/field“a”作为字符串而不是数组返回。有什么解决办法吗?
更新:我还尝试使用 JSON4s 和 net.liftweb.json.parse 进行解析。这也没有帮助
case class aInfo(b: String)
case class col1(a: Option[aInfo]), c: String)
import net.liftweb.json.parse
val parseJson = udf((data: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
})
val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)
当我使用这些解析器时,所有值都为 null。
我的预期结果:我试图从数据集中得到一个扁平化的结构
|b |x |c | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |
最佳答案
我相信所有需要的拼图都已经在这里了,所以让我们一步一步来。您的数据相当于:
val df = Seq((
"""{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1"
)).toDF("col1", "ID", "field2", "field3")
Spark 提供了 json4s,它实现了与 Lift 相同的查询 API:
import org.json4s._
import org.json4s.jackson.JsonMethods._
我们可以使用例如 LINQ 风格的 API 来定义一个 UDF:
val getBs = udf((s: String) => for {
JString(b) <- parse(s) \ "a" \ "b"
} yield b)
如果你想提取多个字段,你当然可以扩展它。例如,如果 JSON 字符串有多个字段
{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"}
你可以:
for {
JObject(a) <- parse(s) \ "a"
JField("b", JString(b)) <- a
JField("d", JInt(d)) <- a
} yield (b, d)
这假定两个字段都存在,否则将不会匹配。要处理缺失的字段,您可能更喜欢 XPath-like表达式或提取器:
case class A(b: Option[String], d: Option[Int])
(parse(s) \ "a").extract(Seq[A])
像这样的 UDF 可以与 explode
一起使用来提取字段:
val withBs = df.withColumn("b", explode(getBs($"col1")))
结果:
+--------------------+---+------+------+------+
| col1| ID|field2|field3| b|
+--------------------+---+------+------+------+
|{"a":[{"b":"value...| A1| X1| Y1|value1|
|{"a":[{"b":"value...| A1| X1| Y1|value2|
+--------------------+---+------+------+------+
您尝试使用 Lift 是不正确的,因为您希望 a
是 aInfo
的序列,但仅将其定义为 Option[aInfo]
。它应该是 Option[Seq[aInfo]]
:
case class col1(a: Option[Seq[aInfo]], c: String)
使用这样定义的类,解析应该可以正常工作。
如果您使用当前版本 (Spark 2.1.0),则 SPARK-17699 引入了一个 from_json
方法这需要一个架构:
import org.apache.spark.sql.types._
val bSchema = StructType(Seq(StructField("b", StringType, true)))
val aSchema = StructField("a", ArrayType(bSchema), true)
val cSchema = StructField("c", StringType, true)
val schema = StructType(Seq(aSchema, cSchema))
并且可以应用为:
import org.apache.spark.sql.functions.from_json
val parsed = df.withColumn("col1", from_json($"col1", schema))
之后您可以使用通常的符号选择字段:
parsed.select($"col1.a.b")
关于apache-spark - 使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39924514/
我正在使用 Boto3 和 Python 开发 Dyanamodb。我发现的问题之一是我们应该何时使用 dynamodb.client、dynamodb.resource 和 dynamodb.Tab
DynamoDB documentation描述了表分区原则上是如何工作的,但它对细节(即数字)非常了解。 DynamoDB 表分区究竟如何以及何时发生? 最佳答案 我找到了这个 presentati
我在 DynamoDB 表中有一个项目。该项目看起来像这样: { data: [ 1, 2, 3, 4, 5, 6 ] more_data: [ 2, 3, 4, 5, 6, 7 ] } 使用
我的 DynamoDB 表中的一个属性是一个名为 REQUEST_IDS 的列表,我想在更新项目之前检查该列表的长度以查看它是否满足条件(小于 10) .如何在 nodejs 的 ConditionE
我正在使用 Amazon Dynamodb,但没有太多经验。我有这样的价格表: 编号 |插入日期 |产品名称 |店名 |价格 相同的商店名称和产品名称可以有不同的值(价格和插入日期可能不同)。例如 i
我对 DynamoDB 上查询/扫描的限制有疑问。 我的表有 1000 条记录,对所有记录的查询返回 50 个值,但是如果我将 Limit 设置为 5,这并不意味着查询将返回前 5 个值,它只是说查询
我需要在 jsp 上显示最大计数为 10 的搜索结果,并且它应该有一个分页来作为分页功能来回遍历。 Dynamodb 有一个 最后评估 key ,但返回上一页无济于事,尽管我可以通过 移动到下一个结果
我是 CouchDB 的忠实粉丝,并且完全爱上了每个文档发出不止一次的 map 函数。我想知道在 DynamoDB 中是否可以通过使用字符串或数字集类型作为散列和范围主键的一部分(作为散列或范围属性)
我目前正在使用 DynamoDB。如果该记录的日期早于新记录日期字段,我想使用条件写入来更新记录。 有没有办法比较条件写入的 DateTime 类型?还是目前仅适用于整数、字符串和流? 谢谢。 最佳答
如何对 dynamoDB 表建模以构建一个标签系统,其中产品可以分配多个标签,并且我们应该能够过滤具有特定标签或标签集合的产品集,并获取分配给给定标签的所有标签产品? 我考虑过有一张 table :
我试图更好地理解在 AWS DynamoDB 中使用邻接列表模式进行多对多 (m:n) 关系设计。 在此处查看 AWS 文档:https://docs.aws.amazon.com/amazondyn
我怀疑 DynamoDB 中的这一说法是否属实或我的理解不正确。它说, ProvisionedThroughputExceededException 消息:您超出了表或一个或多个全局二级索引的最大允许
考虑一个 DynamoDB 表,它由一个主键和两个描述开始日期和结束日期的属性组成。如何在不扫描整个表的情况下查询时间范围是否与表中的时间范围重叠? 例子: 发电机表有两条记录 PK Start
我有一个 DynamoDB 表,其中包含将由许多应用程序读取的键值对。在启动时,每个应用程序将读取整个表并将其缓存在内存中。 我试图解决的问题是,如果 DynamoDB 表中的一个或多个项目已被修改,
我正在有条件地更新 dynamoDB 记录(仅当记录具有其属性之一的特定值时)。无论是否成功更新(条件是否满足),我都想取回记录。 docClient.update(params, function(
我目前正在对 DynamoDB 进行批量加载并将我们的数据项划分为批处理单元: 根据限制文件: https://docs.aws.amazon.com/amazondynamodb/latest/AP
我正在跟踪dynamodb的Python教程,以在端口8000上设置本地dynomodb http://docs.aws.amazon.com/amazondynamodb/latest/gettin
我正在创建一个 DynamoDB 表来保存与单个对象关联的注释。 评论在特定时间发布到对象,我使用发布的时间作为范围,因此评论可以按时间降序排序。我有发布评论的用户的 userId 的全局二级索引,这
我正在运行一个简单的 api,它在每次调用时从 dynamodb 表中获取一个项目,我将自动缩放设置为最小值 25 和最大值 10 000。 但是,如果我使用 wrk 或 hey 之类的工具发送 15
我在模型中有一个字段已声明为字符串,如下所示: App.Student= DS.Model.extend({ name: DS.attr('string'), address1: DS.attr('s
我是一名优秀的程序员,十分优秀!