- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个Python脚本,它每分钟在一个新文件(单行)中从纽约证券交易所获取股票数据(如下)。它包含4只股票的数据 - MSFT、ADBE、GOOGL和FB,如下json格式
[{"symbol": "MSFT", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "126.0800", "high": "126.1000", "low": "126.0500", "close": "126.0750", "volume": "57081"}}, {"symbol": "ADBE", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "279.2900", "high": "279.3400", "low": "279.2600", "close": "279.3050", "volume": "12711"}}, {"symbol": "GOOGL", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "1166.4100", "high": "1166.7400", "low": "1166.2900", "close": "1166.7400", "volume": "8803"}}, {"symbol": "FB", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "192.4200", "high": "192.5000", "low": "192.3600", "close": "192.4800", "volume": "33490"}}]
我正在尝试将此文件流读入 Spark Streaming 数据帧。但我无法为其定义正确的架构。查看互联网并到目前为止完成了以下工作
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
public class Driver1 {
public static void main(String args[]) throws InterruptedException, StreamingQueryException {
SparkSession session = SparkSession.builder().appName("Spark_Streaming").master("local[2]").getOrCreate();
Logger.getLogger("org").setLevel(Level.ERROR);
StructType priceData = new StructType()
.add("open", DataTypes.DoubleType)
.add("high", DataTypes.DoubleType)
.add("low", DataTypes.DoubleType)
.add("close", DataTypes.DoubleType)
.add("volume", DataTypes.LongType);
StructType schema = new StructType()
.add("symbol", DataTypes.StringType)
.add("timestamp", DataTypes.StringType)
.add("stock", priceData);
Dataset<Row> rawData = session.readStream().format("json").schema(schema).json("/home/abhinavrawat/streamingData/data/*");
rawData.printSchema();
rawData.writeStream().format("console").start().awaitTermination();
session.close();
}
}
我得到的输出是这样的-
root
|-- symbol: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- stock: struct (nullable = true)
| |-- open: double (nullable = true)
| |-- high: double (nullable = true)
| |-- low: double (nullable = true)
| |-- close: double (nullable = true)
| |-- volume: long (nullable = true)
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------------------+-----+
|symbol| timestamp|stock|
+------+-------------------+-----+
| MSFT|2019-05-02 15:59:00| null|
| ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
| FB|2019-05-02 15:59:00| null|
| MSFT|2019-05-02 15:59:00| null|
| ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
| FB|2019-05-02 15:59:00| null|
| MSFT|2019-05-02 15:59:00| null|
| ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
| FB|2019-05-02 15:59:00| null|
| MSFT|2019-05-02 15:59:00| null|
| ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
| FB|2019-05-02 15:59:00| null|
| MSFT|2019-05-02 15:59:00| null|
| ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
| FB|2019-05-02 15:59:00| null|
+------+-------------------+-----+
我什至尝试首先将 json 字符串作为文本文件读取,然后应用架构(就像使用 Kafka-Streaming 完成的那样)...
Dataset<Row> rawData = session.readStream().format("text").load("/home/abhinavrawat/streamingData/data/*");
Dataset<Row> raw2 = rawData.select(org.apache.spark.sql.functions.from_json(rawData.col("value"),schema));
raw2.writeStream().format("console").start().awaitTermination();
获取以下输出,在本例中,rawData
数据帧作为字符串 fromat 中的 json 数据,
+--------------------+
|jsontostructs(value)|
+--------------------+
| null|
| null|
| null|
| null|
| null|
请帮我解决一下。
最佳答案
刚刚弄清楚,请记住以下两件事 -
定义架构时,请确保字段的名称和顺序与 json 文件中的完全相同。
最初,所有字段仅使用 StringType
,您可以应用转换将其更改回某种特定的数据类型。
这对我有用 -
StructType priceData = new StructType()
.add("open", DataTypes.StringType)
.add("high", DataTypes.StringType)
.add("low", DataTypes.StringType)
.add("close", DataTypes.StringType)
.add("volume", DataTypes.StringType);
StructType schema = new StructType()
.add("symbol", DataTypes.StringType)
.add("timestamp", DataTypes.StringType)
.add("priceData", priceData);
Dataset<Row> rawData = session.readStream().format("json").schema(schema).json("/home/abhinavrawat/streamingData/data/*");
rawData.writeStream().format("console").start().awaitTermination();
session.close();
查看输出 -
+------+-------------------+--------------------+
|symbol| timestamp| priceData|
+------+-------------------+--------------------+
| MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
| ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
| FB|2019-05-02 15:59:00|[192.4200, 192.50...|
| MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
| ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
| FB|2019-05-02 15:59:00|[192.4200, 192.50...|
| MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
| ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
| FB|2019-05-02 15:59:00|[192.4200, 192.50...|
| MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
| ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
| FB|2019-05-02 15:59:00|[192.4200, 192.50...|
| MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
| ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
| FB|2019-05-02 15:59:00|[192.4200, 192.50...|
+------+-------------------+--------------------+
您现在可以使用 priceData.open
、priceData.close
等展平价格数据列。
关于java - 无法读取json文件: Spark Structured Streaming using java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55979976/
这是我第一次在结构中使用结构。我在编译我的程序时遇到了这个错误。错误:字段“结果”的类型不完整。 错误是指这行代码。-->结构result_t结果; 有什么帮助吗? :)谢谢。 typedef str
typedef struct mensagem { int sender ; int receiver ; char *text ; } *Item ; typedef str
我正在使用 ExpressionEngine 和 Structure 附加组件的最新版本。 我正在寻找有关生成 4 项导航栏的帮助,其中两项位于不同的结构级别。 我的结构行如下所示: 服务(父) --
我正在处理一个非常大的数据集。本质上,我将处理数百万条记录并将值存储到数据集中。 每次我存储一个值时,我必须首先检查以确保该值不在数据结构中。如果值在数据结构中,我必须更新(或删除/添加)记录以更新计
我正在尝试分别使用视频帧和音频来分析视频,我想出了一个看起来像这样的模型 现在,我将训练数据分成两个生成器 - 一个用于视频,一个用于音频。我必须进一步将生成器分成两半,我认为这是我遇到错误的地方。因
我有一个创建 N 个进程的程序,每个进程创建 M 个线程。 我还有一个结构需要传递给线程函数。 当我像这样创建 M 个线程时: thread_args_t** thread_arg = malloc(
我正在试图弄清楚如何实现一个等待事件发出信号的函数。指针由DLL函数返回,该函数是存储3个项的结构。其中两个是句柄,它们只是指针,最后是一些随机的未使用的指针。我真的不确定这应该如何格式化,因为我两个
根据PLCOpen、IEC-61131标准,是否可以在声明中初始化结构体? 我正在考虑类似于 this C++ question 的事情. 最佳答案 您可以在结构声明时向结构变量添加默认值。您还可以在
已关闭。这个问题是 not reproducible or was caused by typos 。目前不接受答案。 这个问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-top
在纯 C 中工作,将结构嵌套在其他结构或指向结构的指针中更好。使用指针可以更容易地实现良好的对齐,但是访问内部结构需要额外的取消引用。只是具体地说: typedef struct {
我正在使用 Qt Creator 开发应用程序。 我不是一个好的C++程序员,所以可能会有概念上的错误等。 我在复制结构数组并返回结构时遇到问题。 有很多与类似标题相关的解决方案,但无法解决我的问题。
我正在尝试使用带水印的 dropDuplicate 函数对流数据进行重复数据删除。我目前面临的问题是我必须为给定记录设置两个时间戳 一个是事件时间戳 - 从源创建记录的时间戳。 另一个是传输时间戳 -
很难说出这里问的是什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或言辞激烈,无法以目前的形式合理回答。如需帮助澄清此问题以便可以重新打开,visit the help center . 10年前关
我尝试构建一个嵌套循环,用于创建 2 维零矩阵来解决 LCS 问题(动态规划)。这后来用于计算 Rouge-L 分数(输入是张量,而不是字符串),但它总是出错引发 ValueError: The tw
我曾多次使用 HDFS 和 Kafka,我注意到 Kafka 比 HDFS 更可靠。因此,现在使用 Spark-structured-streaming 时,我很惊讶检查点仅适用于 HDFS。使用 K
C11,6.7.2.1 结构和 union 说明符,约束,3(添加了强调): A structure or union shall not contain a member with incomple
在 emacs lisp 中,各种树结构是常见的。 custom.el通过:type提供论据 defcustom定义自定义变量的预期形状的标准方法。但是有没有一种标准的方法来验证一些随机 emacs
我在网上遇到了以下面试问题。 描述一个数据结构,其中 getValue(int index)、setValue(int index, int value) 和 setAllValues(int val
我正在使用 sqldf 对一个巨大的文件进行子集化。以下命令为我提供了一个 100 行和 42 列的 data.frame。 first <- read.csv.sql("first.txt", se
来自这里的 C++ 背景。我需要为我的一门类(class)编写 C 语言,但我从未接触过这一类(class)。这两个声明之间有什么区别?为什么要包含 struct 关键字?有不同的含义吗?它们在 C+
我是一名优秀的程序员,十分优秀!