gpt4 book ai didi

join - 我可以使用 Flink state 来执行 join 吗?

转载 作者:行者123 更新时间:2023-12-04 19:35:34 26 4
gpt4 key购买 nike

我正在评估用于流处理的 Apache Flink 作为 Apache Spark 的替代/补充。我们通常使用 Spark 解决的任务之一是数据丰富。

即,我有来自带有传感器 ID 的 IoT 传感器的数据流,并且我有一组传感器元数据。我想将输入流转换为传感器测量+传感器元数据流。

在 Spark 中,我可以使用 RDD 加入 DStream。

case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
spark.read.json(...).as[SensorMetadata]
.map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] =
sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] =>
rdd.join(staticMetadata)
.map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}

我可以用 Apache Flink 做同样的事情吗?我看不到这方面的直接 API。我唯一的想法是使用有状态转换 - 我可以在单个流中合并元数据和传感器事件,并使用 Flink 状态存储来存储元数据(伪代码):

val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
sensorInput.keyBy("sensorId")
.connect(staticMetadata.keyBy("sensorId"))
.flatMap {new RichCoFlatMapFunction() {
private val ValueState<SensorMetadata> md = _;
override def open = ??? // initiate value state
def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) =
collector.collect(s, md.value)
def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) =
md.update(s)
}}

这是正确的方法吗?当元数据不适合一台机器时,我可以在更大规模下使用吗?

谢谢

最佳答案

使用 CoFlatMapFunction加入是一种常见的方法。然而,它有一个明显的缺点。每当任一输入的元组到达并且您无法控制首先使用哪个输入时,就会调用该函数。因此,一开始,您必须在元数据尚未完全读取时处理传感器事件。一种方法是缓冲一个输入的所有事件,直到另一个输入被消耗。另一方面,CoFlatMapFunction方法的好处是您可以动态更新元数据。在您的代码示例中,两个输入都在连接键上键入。这意味着输入是分区的,每个任务槽正在处理不同的 key 集。因此,您的元数据可能比机器可以处理的要大(如果您配置 RocksDB 状态后端,则状态可以持久化到磁盘,因此您甚至不受内存大小的限制)。

如果您要求在作业开始时所有元数据都必须存在,并且元数据是静态的(它不会改变)并且足够小以适合一台机器,您还可以使用常规 FlatMapFunction并在 open() 中加载元数据文件中的方法。与您的方法相反,这将是广播连接,其中每个任务槽在内存中都有完整的元数据。除了在使用事件数据时所有元数据都可用之外,该方法的好处是您不需要对事件数据进行混洗,因为它可以在任何机器上加入。

关于join - 我可以使用 Flink state 来执行 join 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40101261/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com