gpt4 book ai didi

scala - 当processElement依赖于广播数据时,如何在flink中对BroadcastProcessFunction进行单元测试

转载 作者:行者123 更新时间:2023-12-04 12:41:39 26 4
gpt4 key购买 nike

我用 BroadcastProcessFunction 实现了一个 flink 流。从 processBroadcastElement 我得到我的模型并将它应用到我在 processElement 中的事件上。

我没有找到对流进行单元测试的方法,因为我没有找到确保模型在第一个事件之前分派(dispatch)的解决方案。
我想说有两种方法可以实现这一目标:
1. 找到先将模型推送到流中的解决方案
2.在流执行之前用模型填充广播状态,以便恢复

我可能错过了一些东西,但我还没有找到一种简单的方法来做到这一点。

这是一个关于我的问题的简单单元测试:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}

import scala.collection.mutable


class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {

import BroadCastProcessor._

override def processElement(value: Int,
ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
out: Collector[String]): Unit = {
val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)

if (broadcastState.contains(value)) {
out.collect(broadcastState.get(value))
}
}

override def processBroadcastElement(value: (Int, String),
ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
out: Collector[String]): Unit = {
ctx.getBroadcastState(broadcastStateDescriptor).put(value._1, value._2)
}
}

object BroadCastProcessor {
val broadcastStateDescriptor: MapStateDescriptor[Int, String] = new MapStateDescriptor[Int, String]("int_to_string", classOf[Int], classOf[String])
}

class CollectSink extends SinkFunction[String] {

import CollectSink._

override def invoke(value: String): Unit = {
values += value
}
}

object CollectSink { // must be static
val values: mutable.MutableList[String] = mutable.MutableList[String]()
}

class BroadCastProcessTest extends FunSuite with BeforeAndAfter {

before {
CollectSink.values.clear()
}

test("add_elem_to_broadcast_and_process_should_apply_broadcast_rule") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val dataToProcessStream = env.fromElements(1)

val ruleToBroadcastStream = env.fromElements(1 -> "1", 2 -> "2", 3 -> "3")

val broadcastStream = ruleToBroadcastStream.broadcast(BroadCastProcessor.broadcastStateDescriptor)

dataToProcessStream
.connect(broadcastStream)
.process(new BroadCastProcessor)
.addSink(new CollectSink())

// execute
env.execute()

CollectSink.values should contain("1")
}
}

感谢大卫安德森更新
我去了缓冲溶液。我为同步定义了一个过程函数:
class SynchronizeModelAndEvent(modelNumberToWaitFor: Int) extends CoProcessFunction[Int, (Int, String), Int] {
val eventBuffer: mutable.MutableList[Int] = mutable.MutableList[Int]()
var modelEventsNumber = 0

override def processElement1(value: Int, ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
if (modelEventsNumber < modelNumberToWaitFor) {
eventBuffer += value
return
}
out.collect(value)
}

override def processElement2(value: (Int, String), ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
modelEventsNumber += 1

if (modelEventsNumber >= modelNumberToWaitFor) {
eventBuffer.foreach(event => out.collect(event))
}
}
}

所以我需要将它添加到我的流中:
dataToProcessStream
.connect(ruleToBroadcastStream)
.process(new SynchronizeModelAndEvent(3))
.connect(broadcastStream)
.process(new BroadCastProcessor)
.addSink(new CollectSink())

谢谢

最佳答案

没有简单的方法可以做到这一点。您可以让 processElement 缓冲其所有输入,直到 processBroadcastElement 接收到模型。或者在没有事件流量的情况下运行一次作业,并在模型广播后获取保存点。然后将该保存点恢复到同一个作业中,但其事件输入已连接。

顺便说一句,您正在寻找的功能在 Flink 社区中通常被称为“侧输入”。

关于scala - 当processElement依赖于广播数据时,如何在flink中对BroadcastProcessFunction进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54667508/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com