- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将数据与 DataFrame 连接起来,而 DataFrame 又是由左连接产生的。虽然在批处理中这按预期工作,但在流处理中一些条目丢失了......
下面我创建了一个“ session ”的最小示例,它具有“开始”和“结束”事件以及可选的一些“元数据”。
该脚本生成两个输出:sessionStartsWithMetadata
来自与“元数据”事件左连接的“开始”事件,基于 sessionId
。使用“左连接”,因为即使不存在相应的元数据,我们也希望获得输出事件。
此外,DataFrame endedSessionsWithMetadata
是通过将“结束”事件连接到先前创建的 DataFrame 来创建的。这里使用了“内部连接”,因为我们只需要在 session 确定结束时输出一些内容。
这段代码可以在spark-shell
中执行:
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Main data processing, regardless whether batch or stream processing
def process(
sessionStartEvents: DataFrame,
sessionOptionalMetadataEvents: DataFrame,
sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
val sessionStartsWithMetadata: DataFrame = sessionStartEvents
.join(
sessionOptionalMetadataEvents,
sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
sessionStartEvents("sessionStartTimestamp").between(
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
),
"left" // metadata is optional
)
.select(
sessionStartEvents("sessionId"),
sessionStartEvents("sessionStartTimestamp"),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
)
val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
sessionEndEvents,
sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
sessionStartsWithMetadata("sessionStartTimestamp").between(
sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
sessionEndEvents("sessionEndTimestamp")
)
)
(sessionStartsWithMetadata, endedSessionsWithMetadata)
}
def streamProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {
val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionStartEventsStream.addData(sessionStartData)
val sessionStartEvents: DataFrame = sessionStartEventsStream
.toDS()
.toDF("sessionStartTimestamp", "sessionId")
.withWatermark("sessionStartTimestamp", "1 second")
val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
.toDS()
.toDF("sessionOptionalMetadataTimestamp", "sessionId")
.withWatermark("sessionOptionalMetadataTimestamp", "1 second")
val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionEndEventsStream.addData(sessionEndData)
val sessionEndEvents: DataFrame = sessionEndEventsStream
.toDS()
.toDF("sessionEndTimestamp", "sessionId")
.withWatermark("sessionEndTimestamp", "1 second")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
.select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
.select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
(sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}
def batchProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): Unit = {
val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
println("sessionStartsWithMetadata")
sessionStartsWithMetadata.show(100, truncate = false)
println("endedSessionsWithMetadata")
endedSessionsWithMetadata.show(100, truncate = false)
}
// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
(new Timestamp(1), 0),
(new Timestamp(2000), 1),
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionOptionalMetadata = Vector(
(new Timestamp(1), 0),
// session `1` has no metadata
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionEndData = Vector(
(new Timestamp(10000), 0),
(new Timestamp(11000), 1),
(new Timestamp(12000), 2),
(new Timestamp(30000), 10)
)
batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
在 ID 1
的示例 session 中没有元数据,因此相应的元数据列为 null
。
加入数据的主要功能在def process(…)
中实现,它使用批数据和流数据调用。
在批处理版本中,输出符合预期:
sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
+---------+-----------------------+--------------------------------+
endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
|1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
+---------+-----------------------+--------------------------------+-------------------+---------+
但是当相同的处理作为流处理运行时,endedSessionsWithMetadata
的输出不包含没有元数据的 session 1
的条目:
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
|sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
+-------------------------+---------+-----------------------+--------------------------------+
-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
|endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+
-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
谁能解释为什么在流处理中没有“元数据”(sessionId=1
) 的“结束”事件?我需要做什么才能让它出现在输出中?
非常感谢!
最佳答案
经过大量测试,环顾四周并重新阅读手册:
- It must be a bug in Spark.
- I note also this post in circulation: https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3Eand whilst global vs chained stream-stream joins are understood, thispoint imo to an issue for this type of processing.
- I ran on Spark Databricks 3.x to no avail.
关于scala - 在 Spark Structured Streaming 中将数据内部连接到左连接 DataFrame 时丢失条目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64503539/
这个问题已经有答案了: Declaring multiple object pointers on one line causes compiler error (5 个回答) 已关闭 6 年前。 l
我目前正在学习语言处理器,经常出现的一个话题是语法中元素的使用方向。从左到右或从右到左。 我理解这个概念,但似乎有很多方法可以编写这些规则,我不确定它们是否都相同。到目前为止我看到的是: 右/左递归,
我有一个很长的线性(分支不多)流程图,在 graphviz 中显示为要么太高而无法放在单个页面上,要么太宽(如果方向是从左到右) 是否有一种简单的方法可以让 graphviz 以从左到右,然后向下,然
我一直摸不着头脑,但运气不好。设计器有一个包含 3 栏的站点、两个侧边栏和一个主要内容区域。 专为桌面设计,左栏、主要内容、右栏。但是,在较小的设备上,我们希望首先堆叠主要内容。 所以通常情况下,你可
我想要从上到下和从左到右组织的 css block 。 为了更好地解释这是一张图片,其中包含我到目前为止所获得的内容以及我希望使用 CSS 实现的内容: 代码如下: HTML: 1 2 3 4 5
当我问this question时,答案之一(现已删除)建议Either类型对应Curry-Howard correspondence中的XOR而不是OR,因为它不能同时是Left和Right。 真相
我有一个程序,如果用户按住向左或向右箭头键, Angular 色会逐渐朝那个方向加速,并最终达到最大速度。松开按键后, Angular 色逐渐减速,直至完全停止。 我的右方向键没问题,但左方向键坏了。
今天很简单的一个。我有一个专栏,我们称之为标题,有一堆项目标题。我需要从“:”的左侧拉出所有内容并进行左/右修剪(稍后我将在连接中使用它,但我现在只需要一个包含新数据的列) .下面是当前列的示例: 这
我正在尝试将图表中的列与左侧对齐。默认情况下,它们位于中间。 我在 API 文档中找不到任何关于此的信息。 Here是一个 jsFiddle 测试。 最佳答案 在 highcharts api 中,您
左旋转进位和右旋转进位指令有哪些实际用途? 在我的汇编课上,我们无法想出一个有用的好例子。 最佳答案 如果您想将位从一个操作数移出并移入另一个操作数: SHL EAX, 1 ; mov
我有一个查询,它使用 eqjoin 从两个不同的表返回以下数据。我想将 left 和 right 结合起来,而不是执行 zip() (重写 name > 和 joined_at),我想将右侧对象的属性
我使用 firebase API。发送和检索消息。但是,我在尝试为发送者/检索者设置布局时遇到麻烦,以便消息将左/右对齐。目前我只有发送者/检索者都使用的一种布局,但不确定如何设置不同的布局。 pub
我的菜单基本上是一个水平项目滑动条。所有菜单项都有特定的默认 CSS 属性。我希望这些项目在到达主容器的中心时更改其大小和左/右边距,并在离开主容器(或位于主容器之外)时重置为默认值。请参阅我的原理图
我有一个引用表,在这个表中有 3 个字段(Id、User1、User2)。 User2 字段可以为空,但我们在不使用时使用 0。 当我执行下面的 Linq 查询时,User2 == 0 的记录不是结果
不知道如何解决这个问题。 我有两个表结果和受访者 我需要查明受访者表中是否有任何行具有completion =“Complete”,但它们的respondent_id(在结果表和受访者表中)不在结果表
我正在尝试访问三个表以获得类似这样的内容: +------+------+------+ | ITEM | PCS | CSS | +------+------+------+ | 1099 |
left 和 right join 有区别吗,下面的sql 语句结果一样,但是两者的性能是一样的吗? SELECT count(*) FROM writers RIGHT JOIN blogs O
当我使用 LEFT() 使用以下代码从数据库中获取值时 $select="SELECT LEFT(description,500) FROM tbl_news where id='$id'"; $qu
当我将鼠标悬停在水平导航菜单上的页面名称上时,相关子页面会 float 在下方。 目前这些显示居中,我如何对齐它们以便它们向左对齐(与导航菜单标题名称一致)。 你可以去看到这个 http://79.1
在下面的引导网格示例中,它使用 col-sm 和 col-sm-push/pull。col-sm-push 指定了左/右值(偏移量)。 我不太清楚它是如何工作的。 第二个 float 元素来到第一个
我是一名优秀的程序员,十分优秀!