- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在评估用于流处理的 Apache Flink 作为 Apache Spark 的替代/补充。我们通常使用 Spark 解决的任务之一是数据丰富。
即,我有来自带有传感器 ID 的 IoT 传感器的数据流,并且我有一组传感器元数据。我想将输入流转换为传感器测量+传感器元数据流。
在 Spark 中,我可以使用 RDD 加入 DStream。
case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
spark.read.json(...).as[SensorMetadata]
.map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] =
sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] =>
rdd.join(staticMetadata)
.map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}
val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
sensorInput.keyBy("sensorId")
.connect(staticMetadata.keyBy("sensorId"))
.flatMap {new RichCoFlatMapFunction() {
private val ValueState<SensorMetadata> md = _;
override def open = ??? // initiate value state
def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) =
collector.collect(s, md.value)
def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) =
md.update(s)
}}
最佳答案
使用 CoFlatMapFunction
加入是一种常见的方法。然而,它有一个明显的缺点。每当任一输入的元组到达并且您无法控制首先使用哪个输入时,就会调用该函数。因此,一开始,您必须在元数据尚未完全读取时处理传感器事件。一种方法是缓冲一个输入的所有事件,直到另一个输入被消耗。另一方面,CoFlatMapFunction
方法的好处是您可以动态更新元数据。在您的代码示例中,两个输入都在连接键上键入。这意味着输入是分区的,每个任务槽正在处理不同的 key 集。因此,您的元数据可能比机器可以处理的要大(如果您配置 RocksDB 状态后端,则状态可以持久化到磁盘,因此您甚至不受内存大小的限制)。
如果您要求在作业开始时所有元数据都必须存在,并且元数据是静态的(它不会改变)并且足够小以适合一台机器,您还可以使用常规 FlatMapFunction
并在 open()
中加载元数据文件中的方法。与您的方法相反,这将是广播连接,其中每个任务槽在内存中都有完整的元数据。除了在使用事件数据时所有元数据都可用之外,该方法的好处是您不需要对事件数据进行混洗,因为它可以在任何机器上加入。
关于join - 我可以使用 Flink state 来执行 join 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40101261/
我正在编写一个简单的有限状态机,并意识到在某些情况下,事件可以将状态变为多个可能的结果。基本上,从状态 A,如果事件 E 发生,状态可能是 C 或 D。 我目前正在使用此处编写的 Javascript
我在 React 中构建了一个应用程序,我在其中一个样板项目中找到了一行。 (state = {}) => state 谁能给我解释一下上面这行是什么意思?它是 javascript ES6 标准。
如何将多个状态变量组合成另一个? 我想通过一些用户交互来更改高度或宽度的值,并相应地更新 View 中的所有内容。所以高度或宽度会改变,面积也会改变。 我想它看起来像这样 @State var wid
我的容器正在通过 redux store 获取状态。 我通过这样的 Prop 将这个状态传递给模态框:示例: render(){ let {team,id} =this.props.data;
您好,我正在尝试使用 map 根据我所在状态的数组渲染选项,但在返回中使用它时我得到未定义 这是数组 this.state = { countries: ["Australia","Brazil"
我想将 this.state.currentPlayer 分配给 this.state.whosPlaying。它抛出错误TypeError:无法读取新板上未定义的属性“currentPlayer”。
我正在实现某种动态工作流程,当达到某个点时,我必须重新加载状态以呈现 HTML 并重新实例化 Controller 才能继续。 我发现我第二次调用 $state.reload() 不起作用。这是期望的
我正在开发一个 flutter 应用程序,并发现状态管理出现意外行为。我创建了一个示例应用来重现该行为,您可以在下面找到代码和日志输出。 该应用程序包含一个简单的 ListView,其中包含 10 个
有人可以举一个简单的例子,其中 state monad 比直接传递 state 更好吗? bar1 (Foo x) = Foo (x + 1) 对比 bar2 :: State Foo Foo bar
我想写类似 $state.go("/spheres/{{$stateParams.sphereId}}/mono/view"); 的内容使用外部 url 而不是状态,但这不起作用:( 现在我明白为什么
我正在使用“angular-ui-tree”:“^2.22.5” 点击执行某事菜单项时出错.. TypeError: this.$state is undefined 如何将对 $state 的引用传
我在elasticsearch中有文本字段,我想在kibana上可视化词云... 第一步,我们需要标记它们,我使用了“标准标记器” ... 使用这种形式的词云可视化结果如下图所示: 但是我需要的是专有
我正在尝试以编程方式在状态之间移动(使用 ui.router),而用户无需单击任何内容。文档位于 http://angular-ui.github.io/ui-router/site/#/api/ui
我想编写像“(event, state) -> state”这样的折叠函数。如果Java中没有任何模式匹配且不可变,我该如何编写它? 最佳答案 我认为您正在寻找 Java 中的函数式编程。 此版本中引
这个问题已经有答案了: What does an exclamation mark before a variable mean in JavaScript (4 个回答) 已关闭 8 年前。 您好,
https://plnkr.co/edit/bOZW1a9u62W1QA6cYjYj?p=preview 预期 登录后,所有 $states 都会初始化,然后单击 Ticker 按钮后,唯一应重新初始
试图决定(针对我的应用程序)在 onPause() 中保存什么以及要保存在 onSaveInstanceState() 中的内容,我梳理了整个 SO 以获得提示和明确的指导方针。 如果我没理解错的话,
在 Javascript 中,当我单击滚动条(页面中出现的任何滚动条)并将鼠标悬停在图像上时,图像再次开始拖动。 图像只能在鼠标按钮按下状态下拖动。 所以我试图通过了解鼠标按钮状态(mousedown
我见过 Maybe和 Either在代码中使用仿函数(和应用)是有道理的,但我很难想出一个 State 的例子。仿函数和应用。也许它们不是很有用,只是因为 State 才存在。 monad 需要一个仿
我非常努力地想围绕 State Monad,但我不明白以下内容: 鉴于 return 的实现和 (>>=) ,当你说 State $ \s ->.... ,在哪里s来自?我的意思是,当你开始表演时 >
我是一名优秀的程序员,十分优秀!