作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我通过以下代码从kafka服务器获取日志:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", mykey.Kafka_source)
.option("subscribe", mykey.Kafka_topic)
.load();
Dataset<String> dg = df
.selectExpr("CAST(value AS STRING)")
.as(STRING());
然而,dg 的一个元素是这样的“姓名:John Doe,年龄:20”,但它只有一个键“值”。因此,当我将其保存在 HDFS 中时,它的保存方式如下:“值:”姓名:John Doe,年龄:22”。但是,我想像这样更改架构:
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
这样元素就被保存为“姓名:John Doe,年龄:22”
当前元素的架构如下:
root
|-- value: string (nullable = true)
我尝试编写代码将 dg 的每个元素转换为 Dataset 的新元素,但我认为 Java 中的结构化流不支持高级函数表达式。我怎样才能做到这一点..?我想要一些使用 StructType 的解决方案。
最佳答案
您只需将值
转换为预期的架构即可。
如果值采用 JSON 格式,您可以使用 from_json 之一标准功能:
from_json(e: Column, schema: Column): Column
对于其他格式,您必须应用转换(带或不带 UDF)来进行转换。
关于java - 如何将 Kafka 数据源中的值转换为给定的模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57163516/
我是一名优秀的程序员,十分优秀!