gpt4 book ai didi

apache-spark - 为什么流式聚合总是延迟到两批数据?

转载 作者:行者123 更新时间:2023-12-04 03:58:38 24 4
gpt4 key购买 nike

我使用的是 Spark 2.3.0。

我的问题是每当我在我的输入目录中添加第三批数据时,第一批数据得到处理并打印到控制台。为什么?

val spark = SparkSession
.builder()
.appName("micro1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
.config("spark.sql.parquet.cacheMetadata","false")
.getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("year", IntegerType)
.add("rating", DoubleType)
.add("duration", IntegerType)
val xmlData = spark
.readStream
.option("sep", ",")
.schema(mySchema)
.csv("tostack")

// Right side of a join
val mappingSchema = new StructType()
.add("id", StringType)
.add("megavol", StringType)
val staticData = spark
.read
.option("sep", ",")
.schema(mappingSchema)
.csv("input_tost_static.csv")

xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")

spark
.sql("select * from xmlupdates a join mappingdata b on a.id=b.id")
.withColumn(
"event_time",
to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
.withWatermark("event_time", "10 seconds")
.groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
.agg(
sum($"rating") as "rating",
sum($"duration") as "duration",
sum($"megavol") as "sum_megavol")
.drop("window")
.writeStream
.outputMode("append")
.format("console")
.start

我的输出显示数据如下:我先开始流式传输,然后将数据添加到特定文件夹中。当我添加我的第三个文件时,第一个文件的聚合结果正在打印。为什么?

     -------------------------------------------
Batch: 0
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
+----+------+--------+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
+----+------+--------+-----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
|1963| 2.8| 5126| 46.0|
|1921| 6.0| 15212| 3600.0|
+----+------+--------+-----------+

输入数据如下:

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733

input_tost_static.csv数据集如下:

3,3000
4,600
5,46

有人能帮我解释为什么 spark structured streaming 会显示这种行为吗?我需要在这里添加任何设置吗?更新:如果我尝试在 JOIN 操作之前打印 val,我将在第 1 批处理中获得结果......问题出现在加入之后......它延迟了超过 3 个批处理......

最佳答案

I have started the streaming first

Batch: 0 在您开始查询后立即执行,并且没有流式传输任何事件,没有输出。

此时,事件时间水印根本没有设置。

and later added data in to the particular folder.

这可能是批处理:1

然后将事件时间水印设置为 current_timestamp。为了获得任何输出,我们必须等待 “10 秒”(根据 withWatermark("event_time", "10 seconds"))。

when i add my third file the first file aggregated results are getting printed. Why?

这可能是批处理:2

我假设您下次添加新文件时是在之前的 current_timestamp + "10 seconds" 之后,所以您得到了输出。

请注意,水印可以是 0,这意味着没有延迟数据。

关于apache-spark - 为什么流式聚合总是延迟到两批数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54378219/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com