- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 Flink (Java) 中有这个程序,它可以计算数据流中的不同单词。我使用计数单词的示例来实现,并在同一时间应用另一个窗口来评估不同的值。该程序运行良好。但是,我担心我正在使用两个窗口来处理不同的计数。第一个窗口计算单词数,第二个窗口将单词数切换为 1
,并将单词切换为 Tuple2
的第二个元素。我数了数 key 的数量。这是我的程序的输入和输出:
// input:
aaa
aaa
bbb
ccc
bbb
aaa
output:
(3,bbb-ccc-aaa)
如果我删除第二个窗口,它会显示每个键的所有评估并保存前一个窗口的状态。
// input:
aaa
aaa
bbb
ccc
bbb
aaa
// output:
3> (1,bbb)
3> (2,bbb-aaa)
3> (3,bbb-aaa-ccc)
// wait the first window to be evaluated.
// input:
aaa
aaa
bbb
ccc
bbb
aaa
// output:
3> (4,bbb-aaa-ccc-ccc)
3> (5,bbb-aaa-ccc-ccc-bbb)
3> (6,bbb-aaa-ccc-ccc-bbb-aaa)
我的程序:
public class WordCountDistinctSocketFilterQEP {
public WordCountDistinctSocketFilterQEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// @formatter:off
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.keyBy(new MyKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new CountReduceFunction())
.map(new SwapMapFunction())
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // TESTING REMOVING THIS WINDOW
.reduce(new CountDistinctFunction())
.print();
// @formatter:on
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
// dataStream.print();
env.execute("WordCountDistinctSocketFilterQEP");
}
public static class SwapMapFunction implements MapFunction<Tuple2<String, Integer>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 5148172163266330182L;
@Override
public Tuple2<Integer, String> map(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(1, value.f0);
}
}
public static class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
public static class MyKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 2787589690596587044L;
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
public static class CountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 8541031982462158730L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}
public static class CountDistinctFunction implements ReduceFunction<Tuple2<Integer, String>> {
private static final long serialVersionUID = -7077952757215699563L;
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2)
throws Exception {
return Tuple2.of(value1.f0 + value2.f0, value1.f1 + "-" + value2.f1);
}
}
}
最佳答案
ReduceFunctions
与 Collections
更好地合作( Maps
、 Lists
、 Sets
)。如果将每个单词映射到一个元素 Set
,你可以写一个 ReduceFunction
运行于 Set<String>
然后你可以用一个 ReduceFunction
来做到这一点而不是两个。
所以有splitterFlatMap
返回一系列由一个元素组成的长 Set<String>
, MyKeySelector
返回每个集合的第一个元素。窗口函数很好,更改reduce函数以匹配Set<String>
类型,函数的核心是 value1.addAll(value2)
。此时,您已经获得了输入中所有唯一单词的集合,这些单词分布在您正在运行的多个并行任务中。根据完成后您将所有这些数据放在哪里,这可能就足够了。否则,您可以在其末尾放置一个全局窗口,并再次使用相同的reduce函数(解释如下)
你的第二个问题是这不会按原样扩展。在某种程度上,这更像是一个哲学问题。如果不让每个并行实例都与其他实例通信,您就无法真正获得跨并行实例的全局计数。不过,您可以做的是通过实际单词对拆分单词流进行键控,然后使用(并行)键控、窗口 ReduceFunction
获取每个键组中不同单词的列表。然后你可以再吃一个ReduceFunction
这不是并行的,它结合了并行结果的结果。您还希望第二个窗口也打开; WindowFunctions
在触发之前等待所有上游运算符达到正确的水印,因此窗口将确保您的非并行运算符接收来自每个并行运算符的输入。非并行运算符上的聚合是简单的串联,因为一开始的键控保证给定的单词将恰好存在于一个并行槽中。
很明显,单个非并行运算符可能会出现瓶颈,但负载规模与不同单词的总数有关,实际上,由于英语的工作方式,负载规模可能仅限于 10k 单词左右.
关于java - 如何提高 Flink 中数据流实现的不同计数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56524962/
1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势。尽管 SQL
1.概述 转载:Flink 源码阅读笔记(6)- 计算资源管理 在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。 2.Task
1.概述 转载:Flink jvm参数配置GC日志 生产环境上,或者其他要测试 GC 问题的环境上,一定会配置上打印GC日志的参数,便于分析 GC 相关的问题。 但是可能很多人配置的都不够“完美”,要
1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl
1.概述 转载:Flink SQL代码生成与UDF重复调用的优化 2. 代码生成简介 代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成
1.概述 转载:面向流批一体的 Flink Runtime 新进展 首先是关于调度部分的性能优化。Flink 由于存在 all to all 的连接关系,两个并发为 n 的算子之间会有 n² 条边,这
在Fink源码中,有flink-stream-java和flink-stream-scala模块。 flink streaming 为什么需要两个模块? https://github.com/apac
我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
1.概述 转载:Flink 源码阅读笔记(4)- RPC 相关文章: 【Flink】Flink 源码之RPC调用 Flink】FLink 通讯组件 RPC 作为一个分布式系统,Flink 内部不同组件
1.概述 转载并且补充: flink keyby 分布不均匀问题 我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四
1.概述 转载:Flink Sort-Shuffle写流程简析 转载并且补充。 2.配置 taskmanager.network.sort-shuffle.min-parallelism 核心配置。设
1.概述 转载:Flink源码分析——批处理模式Map端数据聚合 在flink的批处理模式下,数据的计算也有着map/reduce两端的计算模型,这一点和MR、spark计算框架是类似的。在数据进行分
1.概述 转载:Flink on yarn 远程调试 大家好,我是 JasonLee。 前几天有小伙伴问我,我写的 Flink 代码是提交到 yarn 上去运行的,那我怎么能远程调试代码呢?在本地调试
当我使用 flink 事件时间窗口时,窗口就是不触发。请问如何解决,有什么debug的方法吗? 最佳答案 由于您使用的是事件时间窗口,所以很可能是水印问题。该窗口仅在水印取得进展时输出。事件时间没有提
我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流程看起来像 Source1 -> operator1 -> Sink1 Source2 -> operator2 -> S
我们正在尝试构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。通过阅读文档,在我看来,Flink 广播状态很适合这种情况。 作为实验,我构建了一个简化版本:假设我有一
我有一个 Flink Streaming 作业,它失败了,我得到如下日志。谁能告诉我如何解决这个问题?有时运行一天就失效,有时运行几个小时就失效。 09:30:25 948 INFO (org.ap
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafk
我是一名优秀的程序员,十分优秀!