- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
在 Spark Structured Streaming(Java 中)中读取分区数据时是否可以设置 basePath
选项?我只想加载特定分区中的数据,例如 basepath/x=1/
,但我还希望将 x
作为列加载。按照我对非流数据帧的方式设置 basePath
似乎不起作用。
这是一个最小的例子。我有一个包含以下数据的数据框:
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
我将其作为 Parquet 文件写入名为 x=1
的子目录。
以下代码(使用常规非流数据帧)工作正常:
Dataset<Row> data = sparkSession.read()
.option("basePath", basePath)
.parquet(basePath + "/x=1");
data.show();
这会产生预期的结果:
+---+---+---+
| a| b| x|
+---+---+---+
| 1| 2| 1|
| 3| 4| 1|
+---+---+---+
但是,以下(使用结构化流式处理 API)不起作用:
StructType schema = data.schema(); // data as defined above
Dataset<Row> streamingData = sparkSession.readStream()
.schema(schema)
.option("basePath", basePath)
.parquet(basePath + "/x=1");
streamingData.writeStream()
.trigger(Trigger.Once())
.format("console")
.start().awaitTermination();
在这种情况下,数据框不包含任何行:
+---+---+---+
| a| b| x|
+---+---+---+
+---+---+---+
最佳答案
我不确定这是否适用于 Spark Streaming,但它适用于我在 Scala 中的批处理。我会做的是完全避免使用 basePath
。例如,当我的数据按年/月/日进行分区时,我想每天循环处理,我会使用字符串插值。
import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.util.Calendar
var dateStart: String = "01/14/2012"
var dateStop: String = "01/18/2012"
var format: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy");
var d1 = new Timestamp(format.parse(dateStart).getTime());
var d2 = new Timestamp(format.parse(dateStop).getTime());
var diffDays:Long = (d2.getTime() - d1.getTime()) / (24 * 60 * 60 * 1000)
var cal:Calendar = Calendar.getInstance()
cal.setTimeInMillis(d1.getTime())
for (i <- 0 to diffDays.toInt){
val year = cal.get(Calendar.YEAR)
val month = cal.get(Calendar.MONTH)
val day = cal.get(Calendar.DAY_OF_MONTH)
var dataframe1 = spark.read
.load(s"s3://bucketName/somepath/year=$year/month=$month/day=$day")
/*
Do your dataframe manipulation here
*/
cal.add(Calendar.DAY_OF_YEAR, 1)
}
您也可以使用字符串或整数列表来执行此操作。如果您需要将该数据视为一列,您始终可以将其作为新列附加到数据框中。不过,我不确定这是否适用于您使用 Spark 流式传输的情况。
关于java - 在 Spark Structured Streaming 中指定 "basePath"选项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49041300/
这是我第一次在结构中使用结构。我在编译我的程序时遇到了这个错误。错误:字段“结果”的类型不完整。 错误是指这行代码。-->结构result_t结果; 有什么帮助吗? :)谢谢。 typedef str
typedef struct mensagem { int sender ; int receiver ; char *text ; } *Item ; typedef str
我正在使用 ExpressionEngine 和 Structure 附加组件的最新版本。 我正在寻找有关生成 4 项导航栏的帮助,其中两项位于不同的结构级别。 我的结构行如下所示: 服务(父) --
我正在处理一个非常大的数据集。本质上,我将处理数百万条记录并将值存储到数据集中。 每次我存储一个值时,我必须首先检查以确保该值不在数据结构中。如果值在数据结构中,我必须更新(或删除/添加)记录以更新计
我正在尝试分别使用视频帧和音频来分析视频,我想出了一个看起来像这样的模型 现在,我将训练数据分成两个生成器 - 一个用于视频,一个用于音频。我必须进一步将生成器分成两半,我认为这是我遇到错误的地方。因
我有一个创建 N 个进程的程序,每个进程创建 M 个线程。 我还有一个结构需要传递给线程函数。 当我像这样创建 M 个线程时: thread_args_t** thread_arg = malloc(
我正在试图弄清楚如何实现一个等待事件发出信号的函数。指针由DLL函数返回,该函数是存储3个项的结构。其中两个是句柄,它们只是指针,最后是一些随机的未使用的指针。我真的不确定这应该如何格式化,因为我两个
根据PLCOpen、IEC-61131标准,是否可以在声明中初始化结构体? 我正在考虑类似于 this C++ question 的事情. 最佳答案 您可以在结构声明时向结构变量添加默认值。您还可以在
已关闭。这个问题是 not reproducible or was caused by typos 。目前不接受答案。 这个问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-top
在纯 C 中工作,将结构嵌套在其他结构或指向结构的指针中更好。使用指针可以更容易地实现良好的对齐,但是访问内部结构需要额外的取消引用。只是具体地说: typedef struct {
我正在使用 Qt Creator 开发应用程序。 我不是一个好的C++程序员,所以可能会有概念上的错误等。 我在复制结构数组并返回结构时遇到问题。 有很多与类似标题相关的解决方案,但无法解决我的问题。
我正在尝试使用带水印的 dropDuplicate 函数对流数据进行重复数据删除。我目前面临的问题是我必须为给定记录设置两个时间戳 一个是事件时间戳 - 从源创建记录的时间戳。 另一个是传输时间戳 -
很难说出这里问的是什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或言辞激烈,无法以目前的形式合理回答。如需帮助澄清此问题以便可以重新打开,visit the help center . 10年前关
我尝试构建一个嵌套循环,用于创建 2 维零矩阵来解决 LCS 问题(动态规划)。这后来用于计算 Rouge-L 分数(输入是张量,而不是字符串),但它总是出错引发 ValueError: The tw
我曾多次使用 HDFS 和 Kafka,我注意到 Kafka 比 HDFS 更可靠。因此,现在使用 Spark-structured-streaming 时,我很惊讶检查点仅适用于 HDFS。使用 K
C11,6.7.2.1 结构和 union 说明符,约束,3(添加了强调): A structure or union shall not contain a member with incomple
在 emacs lisp 中,各种树结构是常见的。 custom.el通过:type提供论据 defcustom定义自定义变量的预期形状的标准方法。但是有没有一种标准的方法来验证一些随机 emacs
我在网上遇到了以下面试问题。 描述一个数据结构,其中 getValue(int index)、setValue(int index, int value) 和 setAllValues(int val
我正在使用 sqldf 对一个巨大的文件进行子集化。以下命令为我提供了一个 100 行和 42 列的 data.frame。 first <- read.csv.sql("first.txt", se
来自这里的 C++ 背景。我需要为我的一门类(class)编写 C 语言,但我从未接触过这一类(class)。这两个声明之间有什么区别?为什么要包含 struct 关键字?有不同的含义吗?它们在 C+
我是一名优秀的程序员,十分优秀!