gpt4 book ai didi

scala - Apache 弗林克 : stepwise execution

转载 作者:行者123 更新时间:2023-12-02 10:26:29 25 4
gpt4 key购买 nike

由于性能测量,我想逐步执行为 Flink 编写的 Scala 程序,即

execute first operator; materialize result;
execute second operator; materialize result;
...

等等。原代码:

var filename = new String("<filename>")
var text = env.readTextFile(filename)
var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts.writeAsText("file://result.txt", WriteMode.OVERWRITE)
env.execute()

所以我想要执行 var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1 ) 逐步进行。

在每个运算符之后调用 env.execute() 是正确的方法吗?

或者是在每次操作后写入/dev/null,即调用counts.writeAsText("file:///home/username/dev/null", WriteMode.OVERWRITE) 然后调用 env.execute() 是更好的选择吗? Flink 是否真的有类似 NullSink 的东西来达到这个目的?

编辑:我在集群上使用 Flink Scala Shell,并将应用程序设置为parallelism=1来执行上述代码。

最佳答案

Flink 默认使用管道式数据传输来提高作业执行的性能。但是,您也可以通过调用强制批量数据传输

ExecutionEnvironment env = ...
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);

这将分开两个运算符的执行(除非它们是链接的)。您可以从日志文件中获取每个任务的执行时间或查看 Web 仪表板。请注意,这不适用于链式运算符,即具有相同并行性并且不需要网络洗牌的运算符。另外,您应该意识到使用批量传输会增加程序的整体执行时间。我认为不可能真正分离管道数据处理器中运算符的执行时间。

在每个运算符之后调用 execute() 将不起作用,因为 Flink 尚不支持在内存中缓存结果。因此,如果执行运算符 2,则需要将运算符 1 的结果写入某个持久存储并再次读取,或者再次执行运算符 1。

关于scala - Apache 弗林克 : stepwise execution,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33691612/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com