gpt4 book ai didi

scala - 如何使用 Apache flink 处理乱序事件?

转载 作者:行者123 更新时间:2023-12-05 02:02:14 25 4
gpt4 key购买 nike

为了测试流处理和 Flink,我给自己出了一个看似简单的问题。我的数据流由 x 组成和 y粒子随时间的坐标 t记录位置的位置。我的目标是用特定粒子的速度注释这些数据。所以流可能看起来像这样。

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

现在无法保证事件会按顺序到达,即 1612103771213 p2 -0.1 -0.1可能会说 10ms之前1612103771212 p2 0.0 0.0 .

为简单起见,可以假设任何迟到的数据都将在 100ms 内到达。的早期数据。

我承认我是流处理和 Flink 的新手,所以这可能是一个愚蠢的问题,但答案很明显,但我目前对如何实现我的目标感到困惑。

编辑

按照 David 的回答,我尝试使用 Flink Table API 对数据流进行排序,使用 nc -lk 9999用于文本套接字流。问题是在我关闭文本套接字流之前,控制台不会打印任何内容。这是我写的 scala 代码 -


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

val OUT_OF_ORDER_NESS = 100

def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

val tableEnv = StreamTableEnvironment.create(env, bSettings)

env.setParallelism(1)

// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)

val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)

val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)

sortedPosStream.print()

// execute program
env.execute()
}

case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}

}

最佳答案

一般来说,水印与事件时间计时器相结合是解决乱序事件流所带来问题的方法。涵盖 Event Time and Watermarks 的官方 Flink 培训部分解释这是如何工作的。

在更高层次上,有时使用 Flink 的 CEP 库或 Flink SQL 之类的东西更容易,因为它们可以很容易地按时间对流进行排序,从而消除所有乱序。例如,参见 How to sort a stream by event time using Flink SQL有关使用 Flink SQL 按事件时间对流进行排序的 Flink DataStream 程序的示例。

在你的例子中,一个相当简单的 MATCH_RECOGNIZE查询会做你正在寻找的东西。这可能看起来像这样,

SELECT *
FROM event
MATCH_RECOGNIZE (
PARTITION BY particleId
ORDER BY ts
MEASURES
b.ts,
b.particleId,
velocity(a, b)
AFTER MATCH SKIP TO NEXT ROW
PATTERN (a b)
DEFINE
a AS TRUE,
b AS TRUE
)

其中 velocity(a, b) 是一个用户定义的函数,它在给定同一粒子的两个连续事件(a 和 b)的情况下计算速度。

关于scala - 如何使用 Apache flink 处理乱序事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65980505/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com