gpt4 book ai didi

scala - 在 Spark Structured Streaming 中将数据内部连接到左连接 DataFrame 时丢失条目

转载 作者:行者123 更新时间:2023-12-04 03:50:21 25 4
gpt4 key购买 nike

我正在尝试将数据与 DataFrame 连接起来,而 DataFrame 又是由左连接产生的。虽然在批处理中这按预期工作,但在流处理中一些条目丢失了......

下面我创建了一个“ session ”的最小示例,它具有“开始”和“结束”事件以及可选的一些“元数据”。

该脚本生成两个输出:sessionStartsWithMetadata 来自与“元数据”事件左连接的“开始”事件,基于 sessionId。使用“左连接”,因为即使不存在相应的元数据,我们也希望获得输出事件。

此外,DataFrame endedSessionsWithMetadata 是通过将“结束”事件连接到先前创建的 DataFrame 来创建的。这里使用了“内部连接”,因为我们只需要在 session 确定结束时输出一些内容。

这段代码可以在spark-shell中执行:

import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

// Main data processing, regardless whether batch or stream processing
def process(
sessionStartEvents: DataFrame,
sessionOptionalMetadataEvents: DataFrame,
sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
val sessionStartsWithMetadata: DataFrame = sessionStartEvents
.join(
sessionOptionalMetadataEvents,
sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
sessionStartEvents("sessionStartTimestamp").between(
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
),
"left" // metadata is optional
)
.select(
sessionStartEvents("sessionId"),
sessionStartEvents("sessionStartTimestamp"),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
)

val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
sessionEndEvents,
sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
sessionStartsWithMetadata("sessionStartTimestamp").between(
sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
sessionEndEvents("sessionEndTimestamp")
)
)

(sessionStartsWithMetadata, endedSessionsWithMetadata)
}

def streamProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {

val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionStartEventsStream.addData(sessionStartData)

val sessionStartEvents: DataFrame = sessionStartEventsStream
.toDS()
.toDF("sessionStartTimestamp", "sessionId")
.withWatermark("sessionStartTimestamp", "1 second")

val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)

val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
.toDS()
.toDF("sessionOptionalMetadataTimestamp", "sessionId")
.withWatermark("sessionOptionalMetadataTimestamp", "1 second")

val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionEndEventsStream.addData(sessionEndData)

val sessionEndEvents: DataFrame = sessionEndEventsStream
.toDS()
.toDF("sessionEndTimestamp", "sessionId")
.withWatermark("sessionEndTimestamp", "1 second")

val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
.select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()

val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
.select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()

(sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}

def batchProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): Unit = {

val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")

val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

println("sessionStartsWithMetadata")
sessionStartsWithMetadata.show(100, truncate = false)

println("endedSessionsWithMetadata")
endedSessionsWithMetadata.show(100, truncate = false)
}


// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
(new Timestamp(1), 0),
(new Timestamp(2000), 1),
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)

val sessionOptionalMetadata = Vector(
(new Timestamp(1), 0),
// session `1` has no metadata
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)

val sessionEndData = Vector(
(new Timestamp(10000), 0),
(new Timestamp(11000), 1),
(new Timestamp(12000), 2),
(new Timestamp(30000), 10)
)

batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

在 ID 1 的示例 session 中没有元数据,因此相应的元数据列为 null

加入数据的主要功能在def process(…)中实现,它使用批数据和流数据调用。

在批处理版本中,输出符合预期:

sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
+---------+-----------------------+--------------------------------+

endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
|1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
+---------+-----------------------+--------------------------------+-------------------+---------+

但是当相同的处理作为流处理运行时,endedSessionsWithMetadata 的输出不包含没有元数据的 session 1 的条目:

-------------------------------------------                                     
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
|sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
+-------------------------+---------+-----------------------+--------------------------------+

-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
|endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+

-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+

-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘


谁能解释为什么在流处理中没有“元数据”(sessionId=1) 的“结束”事件?我需要做什么才能让它出现在输出中?

非常感谢!

最佳答案

经过大量测试,环顾四周并重新阅读手册:

关于scala - 在 Spark Structured Streaming 中将数据内部连接到左连接 DataFrame 时丢失条目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64503539/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com