gpt4 book ai didi

scala - 如何在 flink 中统一测试指标

转载 作者:行者123 更新时间:2023-12-05 02:15:32 27 4
gpt4 key购买 nike

我在 flink 中有一个数据流,我使用 ProcessFunction 中的 gauge 生成我自己的指标。
由于这些指标对我的事件很重要,我想在流程执行后对它们进行单元测试。
不幸的是,我没有找到一种方法来实现一个合适的测试报告器。这是解释我的问题的简单代码。
这段代码有两个问题:

  1. 我如何触发仪表
  2. 如何让 env.execute 实例化 reporter

这是示例

import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.metrics.reporter.AbstractReporter
import org.apache.flink.metrics.{Gauge, Metric, MetricConfig}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalatest.PartialFunctionValues._

import scala.collection.JavaConverters._
import scala.collection.mutable

/* Test based on Flink test example https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html */

class MultiplyByTwo extends ProcessFunction[Long, Long] {
override def processElement(data: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {
collector.collect(data * 2L)
}

val nbrCalls = new AtomicInteger(0)

override def open(parameters: Configuration): Unit = {
getRuntimeContext.getMetricGroup
.addGroup("counter")
.gauge[Int, ScalaGauge[Int]]("call" , ScalaGauge[Int]( () => nbrCalls.get()))
}
}

// create a testing sink
class CollectSink extends SinkFunction[Long] {
override def invoke(value: Long): Unit = {
synchronized {
CollectSink.values.add(value)
}
}
}

object CollectSink {
val values: java.util.ArrayList[Long] = new java.util.ArrayList[Long]()
}

class StackOverflowTestReporter extends AbstractReporter {
var gaugesMetrics : mutable.Map[String, String] = mutable.Map[String, String]()

override def open(metricConfig: MetricConfig): Unit = {}

override def close(): Unit = {}

override def filterCharacters(s: String): String = s

def report(): Unit = {
gaugesMetrics = this.gauges.asScala.map(t => (metricValue(t._1), t._2))
}

private def metricValue(m: Metric): String = {
m match {
case g: Gauge[_] => g.getValue.toString
case _ => ""
}
}
}

class StackOverflowTest extends FunSuite with StreamingMultipleProgramsTestBase{

def createConfigForReporter(reporterName : String) : Configuration = {
val cfg : Configuration = new Configuration()
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, classOf[StackOverflowTestReporter].getName)
cfg
}

test("test_metrics") {

val env = StreamExecutionEnvironment.createLocalEnvironment(
StreamExecutionEnvironment.getDefaultLocalParallelism,
createConfigForReporter("reporter"))

// configure your test environment
env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// values are collected in a static variable
CollectSink.values.clear()

// create a stream of custom elements and apply transformations
env.fromElements[Long](1L, 21L, 22L)
.process(new MultiplyByTwo())
.addSink(new CollectSink())

// execute
env.execute()

// verify your results
CollectSink.values should have length 3
CollectSink.values should contain (2L)
CollectSink.values should contain (42L)
CollectSink.values should contain (44L)

//verify gauge counter
//pseudo code ...
val testReporter : StackOverflowTestReporter = _ // how to get testReporter instantiate in env
testReporter.gaugesMetrics should have size 1
testReporter.gaugesMetrics should contain key "count.call"
testReporter.gaugesMetrics.valueAt("count.call") should be equals("3")
}
}

感谢 Chesnay Schepler 的解决方案

import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.metrics.reporter.MetricReporter
import org.apache.flink.metrics.{Metric, MetricConfig, MetricGroup}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.test.util.MiniClusterResource
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.PartialFunctionValues._
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import scala.collection.mutable

/* Test based on Flink test example https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html */

class MultiplyByTwo extends ProcessFunction[Long, Long] {
override def processElement(data: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {
nbrCalls.incrementAndGet()
collector.collect(data * 2L)
}

val nbrCalls = new AtomicInteger(0)

override def open(parameters: Configuration): Unit = {
getRuntimeContext.getMetricGroup
.addGroup("counter")
.gauge[Int, ScalaGauge[Int]]("call" , ScalaGauge[Int]( () => nbrCalls.get()))
}
}

// create a testing sink
class CollectSink extends SinkFunction[Long] {
import CollectSink._
override def invoke(value: Long): Unit = {
synchronized {
values.add(value)
}
}
}

object CollectSink {
val values: java.util.ArrayList[Long] = new java.util.ArrayList[Long]()
}

class StackOverflowTestReporter extends MetricReporter {
import StackOverflowTestReporter._

override def open(metricConfig: MetricConfig): Unit = {}

override def close(): Unit = {}

override def notifyOfAddedMetric(metric: Metric, metricName: String, group: MetricGroup) : Unit = {
metric match {
case gauge: ScalaGauge[_] => {
//drop group metrics meaningless for the test, seem's to be the first 6 items
val gaugeKey = group.getScopeComponents.toSeq.drop(6).mkString(".") + "." + metricName
gaugesMetrics(gaugeKey) = gauge.asInstanceOf[ScalaGauge[Int]]
}
case _ =>
}
}

override def notifyOfRemovedMetric(metric: Metric, metricName: String, group: MetricGroup): Unit = {}
}

object StackOverflowTestReporter {
var gaugesMetrics : mutable.Map[String, ScalaGauge[Int]] = mutable.Map[String, ScalaGauge[Int]]()
}

class StackOverflowTest extends FunSuite with BeforeAndAfterAll{

val miniClusterResource : MiniClusterResource = buildMiniClusterResource()

override def beforeAll(): Unit = {
CollectSink.values.clear()
StackOverflowTestReporter.gaugesMetrics.clear()
miniClusterResource.before()
}

override def afterAll(): Unit = {
miniClusterResource.after()
}

def createConfigForReporter() : Configuration = {
val cfg : Configuration = new Configuration()
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter" + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, classOf[StackOverflowTestReporter].getName)
cfg
}

def buildMiniClusterResource() : MiniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
createConfigForReporter(),1,1, Time.milliseconds(50L)))

test("test_metrics") {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.fromElements[Long](1L, 21L, 22L)
.process(new MultiplyByTwo())
.addSink(new CollectSink())

env.execute()

CollectSink.values should have length 3
CollectSink.values should contain (2L)
CollectSink.values should contain (42L)
CollectSink.values should contain (44L)

//verify gauge counter
val gaugeValues = StackOverflowTestReporter.gaugesMetrics.map(t => (t._1, t._2.getValue()))
gaugeValues should have size 1
gaugeValues should contain ("counter.call" -> 3)
}
}

最佳答案

您最好的选择是使用 MiniClusterResource 在作业之前显式启动一个集群,并配置一个报告器来检查特定指标并通过静态字段公开它们。

@Rule
public final MiniClusterResource clusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig()));

private static Configuration getConfig() {
Configuration config = new Configuration();

config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX +
"myTestReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
MyTestReporter.class.getName());
return config;
}

public static class MyTestReporter implements MetricReporter {
static volatile Gauge<?> myGauge = null;

@Override
public void open(MetricConfig metricConfig) {
}

@Override
public void close() {
}

@Override
public void notifyOfAddedMetric(Metric metric, String name, MetricGroup metricGroup) {
if ("myMetric".equals(name)) {
myGauge = (Gauge<?>) metric;
}
}

@Override
public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
}
}

关于scala - 如何在 flink 中统一测试指标,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51675597/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com