gpt4 book ai didi

java - 在 Spark Structured Streaming 中指定 "basePath"选项

转载 作者:搜寻专家 更新时间:2023-11-01 03:46:41 24 4
gpt4 key购买 nike

在 Spark Structured Streaming(Java 中)中读取分区数据时是否可以设置 basePath 选项?我只想加载特定分区中的数据,例如 basepath/x=1/,但我还希望将 x 作为列加载。按照我对非流数据帧的方式设置 basePath 似乎不起作用。

这是一个最小的例子。我有一个包含以下数据的数据框:

+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+

我将其作为 Parquet 文件写入名为 x=1 的子目录。

以下代码(使用常规非流数据帧)工作正常:

Dataset<Row> data = sparkSession.read()
.option("basePath", basePath)
.parquet(basePath + "/x=1");

data.show();

这会产生预期的结果:

+---+---+---+
| a| b| x|
+---+---+---+
| 1| 2| 1|
| 3| 4| 1|
+---+---+---+

但是,以下(使用结构化流式处理 API)不起作用:

StructType schema = data.schema(); // data as defined above

Dataset<Row> streamingData = sparkSession.readStream()
.schema(schema)
.option("basePath", basePath)
.parquet(basePath + "/x=1");

streamingData.writeStream()
.trigger(Trigger.Once())
.format("console")
.start().awaitTermination();

在这种情况下,数据框不包含任何行:

+---+---+---+
| a| b| x|
+---+---+---+
+---+---+---+

最佳答案

我不确定这是否适用于 Spark Streaming,但它适用于我在 Scala 中的批处理。我会做的是完全避免使用 basePath。例如,当我的数据按年/月/日进行分区时,我想每天循环处理,我会使用字符串插值。

import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.util.Calendar

var dateStart: String = "01/14/2012"
var dateStop: String = "01/18/2012"

var format: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy");


var d1 = new Timestamp(format.parse(dateStart).getTime());
var d2 = new Timestamp(format.parse(dateStop).getTime());

var diffDays:Long = (d2.getTime() - d1.getTime()) / (24 * 60 * 60 * 1000)

var cal:Calendar = Calendar.getInstance()
cal.setTimeInMillis(d1.getTime())
for (i <- 0 to diffDays.toInt){
val year = cal.get(Calendar.YEAR)
val month = cal.get(Calendar.MONTH)
val day = cal.get(Calendar.DAY_OF_MONTH)
var dataframe1 = spark.read
.load(s"s3://bucketName/somepath/year=$year/month=$month/day=$day")
/*
Do your dataframe manipulation here
*/
cal.add(Calendar.DAY_OF_YEAR, 1)
}

您也可以使用字符串或整数列表来执行此操作。如果您需要将该数据视为一列,您始终可以将其作为新列附加到数据框中。不过,我不确定这是否适用于您使用 Spark 流式传输的情况。

关于java - 在 Spark Structured Streaming 中指定 "basePath"选项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49041300/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com