gpt4 book ai didi

apache-flink - Flink MiniCluster 如何触发 ProcessTimeTimer

转载 作者:行者123 更新时间:2023-12-04 09:00:27 26 4
gpt4 key购买 nike

我有一个 Flink KeyedCoProcessFunction 在更大的 Flink 流作业中注册处理时间计时器,我正在尝试使用 Flink MiniCluster 为整个作业创建单元测试。 .但是我无法让 KeyedCoProcessFunction 中的 onTimer() 回调触发。

有没有人让这个工作?是否需要任何特殊配置?

切换到 Event Time 工作正常,所以我想知道这是否不适用于 Flink MiniCluster 还是我的实现有问题。

我用 Scala 编写了一个简单的测试,看看我是否能让它工作。

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory

class TimerTest extends AnyFlatSpec with BeforeAndAfter {

private val SlotsPerTaskMgr = 1
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
.setNumberTaskManagers(1)
.build)

before {
flinkCluster.before()
}

after {
flinkCluster.after()
}

"MiniCluster" should "trigger onTimer" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

implicit val longTypeInfo: TypeInformation[Long] = TypeInformation.of(classOf[Long])

val sink = new TestListResultSink[Long]

env.addSource(new MyLongSource(100))
.keyBy(v => v)
.process(new MyProccesor())
.addSink(sink)

env.execute()

println("Received " + sink.getResult.size() + " output records.")
}

}

class MyProccesor extends KeyedProcessFunction[Long, Long, Long] {

private val log = LoggerFactory.getLogger(this.getClass)

override def processElement(
value: Long,
ctx: KeyedProcessFunction[Long, Long, Long]#Context,
out: Collector[Long]): Unit = {
log.info("Received {} at {}", value, ctx.timerService().currentProcessingTime())
if (value % 10 == 0) {
log.info("Scheduling processing timer for {}", ctx.timerService().currentProcessingTime() + 10)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
}
}

override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Long, Long, Long]#OnTimerContext,
out: Collector[Long]): Unit = {
log.info("Received onTimer at {}", timestamp)
out.collect(timestamp)
}
}

class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
@volatile private var stop = false

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
for(i <- 1 to n) {
if(stop) return;
println("Sending " + i)
ctx.collect(i)
}

Thread.sleep(1000)
}

override def cancel(): Unit = {
stop = true
}
}

通过在源 run() 方法的末尾添加 Thread.sleep(1000),我终于能够获得一些一致的结果。似乎一旦源退出,消息就会得到处理,然后即使有挂起的计时器,一切都会关闭。

最佳答案

当 Flink 作业关闭时,任何挂起的处理时间计时器都会被忽略。他们从不开火。

就其值(value)而言,Flink 开发邮件列表上正在进行一些关于提供触发所有待处理时间计时器的选项的讨论。参见 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558 .

关于apache-flink - Flink MiniCluster 如何触发 ProcessTimeTimer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63586762/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com