gpt4 book ai didi

java - Flink CEP 不在事件时间工作,但在处理时间工作

转载 作者:行者123 更新时间:2023-12-01 18:17:41 26 4
gpt4 key购买 nike

当我使用 Flink CEP 代码来处理时间时(默认配置),我能够获得所需的模式匹配,但在将环境配置为事件时间时,我无法获得任何模式匹配。

 def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(3000) // checkpoint every 3000 msec
val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))

val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here

val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
val patternStream = CEP.pattern(event, pattern1)
val warnID = patternStream.sideOutputLateData(latedata).select(value => {
val v = value.mapValues(c => c.toList.toString)
Json(DefaultFormats).write(v).replace("\\\"", "\"")
//.replace("List(","{").replace(")","}")
})
val latedatastream = warnID.getSideOutput(latedata)
latedatastream.print("late_data")


warnID.print("warning")
event.print("event")

时间戳提取代码

object ExtractAndAssignEventTime {
def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
if(!(timeFormat.equalsIgnoreCase("Unix"))){
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
override def extractTimestamp(t: GenericRecord): Long = {
new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
}
})
EventTimeStream
}
else{
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
override def extractTimestamp(t: GenericRecord): Long = {
(t.get(timeColumn).toString.toLong)
}
})
EventTimeStream
}
}

请帮我解决这个问题。提前致谢。!

最佳答案

由于您使用的是AssingerWithPeriodicWatermark,您还需要设置setAutowatermarkInterval,以便Flink使用此间隔来生成水印。

您可以通过调用env.getConfig.setAutoWatermarkInterval([interval])来完成此操作。

对于事件时间CEP,基于水印,因此如果不生成水印,则基本上不会有输出。

关于java - Flink CEP 不在事件时间工作,但在处理时间工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60335612/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com