gpt4 book ai didi

java - Apache Flink 的吞吐量和延迟

转载 作者:行者123 更新时间:2023-11-30 10:28:30 24 4
gpt4 key购买 nike

我已经为 Apache Flink 编写了一个非常简单的 Java 程序,现在我有兴趣测量统计数据,例如吞吐量(每秒处理的元组数)和延迟(程序处理每个输入元组所需的时间)。

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();

我知道 Flink 公开了一些指标:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

但我不确定如何使用它们来获得我想要的东西。从链接中我读到可以使用“仪表”来测量平均吞吐量,但是在定义它之后,我应该如何使用它?

最佳答案

我们在 yarn 上运行的生产流作业中运行自定义指标,例如 meter、gauge。

步骤如下:

对 pom.xml 的额外依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>

我们使用的是 1.2.1 版本

然后将 meter 添加到 MyMapper 类。

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Test {


public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper())
.writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();
}


private static class MyMapper extends RichMapFunction<String, Object> {

private transient Meter meter;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}

@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
}

希望这对您有所帮助。

关于java - Apache Flink 的吞吐量和延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44587645/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com