- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Flink KeyedCoProcessFunction
在更大的 Flink 流作业中注册处理时间计时器,我正在尝试使用 Flink MiniCluster 为整个作业创建单元测试。 .但是我无法让 KeyedCoProcessFunction
中的 onTimer()
回调触发。
有没有人让这个工作?是否需要任何特殊配置?
切换到 Event Time 工作正常,所以我想知道这是否不适用于 Flink MiniCluster 还是我的实现有问题。
我用 Scala 编写了一个简单的测试,看看我是否能让它工作。
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory
class TimerTest extends AnyFlatSpec with BeforeAndAfter {
private val SlotsPerTaskMgr = 1
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"MiniCluster" should "trigger onTimer" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
implicit val longTypeInfo: TypeInformation[Long] = TypeInformation.of(classOf[Long])
val sink = new TestListResultSink[Long]
env.addSource(new MyLongSource(100))
.keyBy(v => v)
.process(new MyProccesor())
.addSink(sink)
env.execute()
println("Received " + sink.getResult.size() + " output records.")
}
}
class MyProccesor extends KeyedProcessFunction[Long, Long, Long] {
private val log = LoggerFactory.getLogger(this.getClass)
override def processElement(
value: Long,
ctx: KeyedProcessFunction[Long, Long, Long]#Context,
out: Collector[Long]): Unit = {
log.info("Received {} at {}", value, ctx.timerService().currentProcessingTime())
if (value % 10 == 0) {
log.info("Scheduling processing timer for {}", ctx.timerService().currentProcessingTime() + 10)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
}
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Long, Long, Long]#OnTimerContext,
out: Collector[Long]): Unit = {
log.info("Received onTimer at {}", timestamp)
out.collect(timestamp)
}
}
class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
@volatile private var stop = false
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
for(i <- 1 to n) {
if(stop) return;
println("Sending " + i)
ctx.collect(i)
}
Thread.sleep(1000)
}
override def cancel(): Unit = {
stop = true
}
}
通过在源 run()
方法的末尾添加 Thread.sleep(1000)
,我终于能够获得一些一致的结果。似乎一旦源退出,消息就会得到处理,然后即使有挂起的计时器,一切都会关闭。
最佳答案
当 Flink 作业关闭时,任何挂起的处理时间计时器都会被忽略。他们从不开火。
就其值(value)而言,Flink 开发邮件列表上正在进行一些关于提供触发所有待处理时间计时器的选项的讨论。参见 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558 .
关于apache-flink - Flink MiniCluster 如何触发 ProcessTimeTimer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63586762/
本文整理了Java中com.github.sakserv.minicluster.util.WindowsLibsUtils类的一些代码示例,展示了WindowsLibsUtils类的具体用法。这些代
本文整理了Java中com.github.sakserv.minicluster.impl.YarnLocalCluster类的一些代码示例,展示了YarnLocalCluster类的具体用法。这些代
本文整理了Java中com.github.sakserv.minicluster.impl.ZookeeperLocalCluster类的一些代码示例,展示了ZookeeperLocalCluster
本文整理了Java中com.uber.hoodie.common.minicluster.ZookeeperTestService类的一些代码示例,展示了ZookeeperTestService类的具
我有一个 Flink KeyedCoProcessFunction 在更大的 Flink 流作业中注册处理时间计时器,我正在尝试使用 Flink MiniCluster 为整个作业创建单元测试。 .但
本文整理了Java中com.github.sakserv.minicluster.util.WindowsLibsUtils.setHadoopHome()方法的一些代码示例,展示了WindowsLi
本文整理了Java中com.github.sakserv.minicluster.impl.YarnLocalCluster.getConfig()方法的一些代码示例,展示了YarnLocalClus
本文整理了Java中com.github.sakserv.minicluster.impl.YarnLocalCluster.stop()方法的一些代码示例,展示了YarnLocalCluster.s
本文整理了Java中com.github.sakserv.minicluster.impl.ZookeeperLocalCluster.stop()方法的一些代码示例,展示了ZookeeperLoca
本文整理了Java中com.uber.hoodie.common.minicluster.ZookeeperTestService.start()方法的一些代码示例,展示了ZookeeperTestS
本文整理了Java中com.uber.hoodie.common.minicluster.ZookeeperTestService.()方法的一些代码示例,展示了ZookeeperTestServic
我在 MacOS 开发环境中使用 Apache Hadoop 2.2.0。当尝试按照 apache 文档中的描述运行 hadoop minicluster 时: hadoop jar ./share/
我是一名优秀的程序员,十分优秀!