- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在评估用于流处理的 Apache Flink 作为 Apache Spark 的替代/补充。我们通常使用 Spark 解决的任务之一是数据丰富。
即,我有来自带有传感器 ID 的 IoT 传感器的数据流,并且我有一组传感器元数据。我想将输入流转换为传感器测量+传感器元数据流。
在 Spark 中,我可以使用 RDD 加入 DStream。
case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
spark.read.json(...).as[SensorMetadata]
.map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] =
sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] =>
rdd.join(staticMetadata)
.map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}
val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
sensorInput.keyBy("sensorId")
.connect(staticMetadata.keyBy("sensorId"))
.flatMap {new RichCoFlatMapFunction() {
private val ValueState<SensorMetadata> md = _;
override def open = ??? // initiate value state
def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) =
collector.collect(s, md.value)
def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) =
md.update(s)
}}
最佳答案
使用 CoFlatMapFunction
加入是一种常见的方法。然而,它有一个明显的缺点。每当任一输入的元组到达并且您无法控制首先使用哪个输入时,就会调用该函数。因此,一开始,您必须在元数据尚未完全读取时处理传感器事件。一种方法是缓冲一个输入的所有事件,直到另一个输入被消耗。另一方面,CoFlatMapFunction
方法的好处是您可以动态更新元数据。在您的代码示例中,两个输入都在连接键上键入。这意味着输入是分区的,每个任务槽正在处理不同的 key 集。因此,您的元数据可能比机器可以处理的要大(如果您配置 RocksDB 状态后端,则状态可以持久化到磁盘,因此您甚至不受内存大小的限制)。
如果您要求在作业开始时所有元数据都必须存在,并且元数据是静态的(它不会改变)并且足够小以适合一台机器,您还可以使用常规 FlatMapFunction
并在 open()
中加载元数据文件中的方法。与您的方法相反,这将是广播连接,其中每个任务槽在内存中都有完整的元数据。除了在使用事件数据时所有元数据都可用之外,该方法的好处是您不需要对事件数据进行混洗,因为它可以在任何机器上加入。
关于join - 我可以使用 Flink state 来执行 join 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40101261/
我正在测试设置SQLAlchemy以映射现有数据库。这个数据库是很久以前自动建立的,它是由我们不再使用的先前的第三方应用程序创建的,因此 undefined 某些预期的事情,例如外键约束。该软件将管理
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (28 个答案) 关闭 7 年前。 INNE
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (29 个回答) 关闭7年前. INNER J
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
一.先看一些最简单的例子 例子 Table A aid adate 1 a1 2&nb
数据库操作语句 7. 外连接——交叉查询 7.1 查询 7.2 等值连接 7.3 右外
我有两个表 'users' 和 'lms_users' class LmsUser belongs_to :user end class User has_one :lms_user
我试图避免在 Rails 中对我的 joins 进行字符串插值,因为我注意到将查询器链接在一起时灵活性会降低。 也就是说,我觉得 joins(:table1) 比 joins('inner join
我有这个代码 User.find(:all, :limit => 10, :joins => :user_points, :select => "users.*, co
我刚刚开始探索 Symfony2,我很惊讶它拥有如此多的强大功能。我开始做博客教程在: http://tutorial.symblog.co.uk/ 但使用的是 2.1 版而不是 2.0 我的问题是我
什么是 SQL JOIN什么是不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: http
我有两个 Hive 表,我正在尝试加入它们。这些表没有被任何字段聚集或分区。尽管表包含公共(public)键字段的记录,但连接查询始终返回 0 条记录。所有数据类型都是“字符串”数据类型。 连接查询很
我正在使用 Solr 的(4.0.0-beta)连接功能来查询包含具有父/子关系的文档的索引。连接查询效果很好,但我只能在搜索结果中获得父文档。我相信这是预期的行为。 但是,是否有可能在搜索结果中同时
我正在使用可用的指南/api/书籍自学 Rails,但我无法理解通过三种方式/嵌套 has_many :through 关联进行的连接。 我有用户与组相关联:通过成员(member)资格。 我在多对多
什么是 SQL JOIN,有哪些不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: htt
我正在尝试访问数据库的两个表。在商店里,我保留了一个事件列表,其中包含 Table Event id, name,datei,houri, dateF,Hourf ,capacity, age ,de
我有 4 个表:booking、address、search_address 和 search_address_log 表:(相关列) 预订:(pickup_address_id, dropoff_a
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我是一名优秀的程序员,十分优秀!