- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
Flink 通常处理流式、无限数据集的计算引擎,窗口是一种把无限流式数据集切割成有限的数据集进行计算。window窗口在Flink中极其重要。
window 注意分为两大类型:CountWindow 和 TimeWindow
CountWindow 是与时间没有关系的,比如 数据收集的一定大小(1w)的时候就会触发窗口函数进行计算。
TimeWindow 就是时间窗口,它与时间非常紧密。主要分为三大类:
滚动窗口(Tumbling window)、滑动窗口(Sliding window)、回话窗口(Session window)。
特征:时间对齐,没有重叠,并且时间窗口大小固定
比如:计算五分钟内的数据。窗口起始时间假设为 10:00 那么时间到了 10:05 时刻就会触发 10:00 - 10:05 段时间窗口,注意区间是左闭右开。
与滚动窗口不同的是,滑动窗口多了滑动间隔时间,那么就会出现数据的重叠或者数据丢失。如果滑动时间间隔小于滑动窗口大小,那么就是出现数据的重叠,也就是重叠的数据可能被多个窗口计算;如果滑动时间间隔等于滑动窗口大小,那么就相等于滚动窗口;如果滑动时间间隔大于滑动时间窗口,那么就会出现数据的丢失。
比如:每隔1分钟计算五分钟内的数据,假设窗口起始时间为 10:00,那么就会触发 9:55 - 10:00 段的时间窗口,时间到了 10:01,那么就会触发 9:56 - 10:01 段的时间窗口,以此类推,注意窗口都是左闭右开的。
类似于Web应用的 Session 回话,简单一句话,在一定时间内没有接收到任务数据,那么上一次窗口就触发计算。
比如:5分钟内没有收到任何数据,就触发上一次窗口计算,并重新开一个窗口。假设接收到第一条数据的时间是 9:54(时间任意),目前窗口时间为 10:00 ,那么到了 10:05 并且在 10:00 - 10:05 段时间内没有接收任何数据,就会触发 9:54 - 10:00 段的窗口计算,并重新以 10:05 开一个新的窗口,往后有数据以此类推,如果还是在 10:05 - 10:10 没有数据,其实计不计算都显得不重要了。主要的特征是触发窗口时间不固定,窗口时间也不固定。
windwo API 核心是窗口适配器(windowAssigner)。windwo(入参) 的入参接受的就是窗口适配器,它负责把数据分发到正确的 window中。但是 Flink 已经提供好了通用的窗口适配器。
Flink 默认的时间窗口是根据 processing time(处理时间)进行划分和计算。但是应用大多数都是 Event time(事件时间),后面具体介绍,目前window API 默认使用 processing time。窗口还可以分为不同流窗口(也就是 keyby之后的数据)和全窗口(也就是全部数据)。
计算传感器五秒之内的最大温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Tumbling {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L)) // 滚动时间窗口大小
.maxBy("temperature");
resultDataStream.print();
env.execute();
}
}
每隔2秒计算传感器5秒内的最小温度值
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L), Time.seconds(2)) // 窗口大小5秒 滑动大小2秒
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L), Time.seconds(2)) // 窗口大小5秒 滑动大小2秒
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L), Time.seconds(2)) // 窗口大小5秒 滑动大小2秒
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
根据传感器id每传来3条数据就计算这些数据的最大温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CountWindow_Tumbling {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.countWindow(3)
.maxBy("temperature");
resultDataStream.print();
env.execute();
}
}
根据传感器id每隔2条数据计算最近5条数据的最小温度值
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CountWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.countWindow(5, 2)
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}
windwo function 主要针对触发窗口计算的操作,主要分为:
增量聚合函数、全窗口函数、其他可选 API。
简单地说就是,每来一条数据就计算,相当于预聚合,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction、AggregateFunction。
先把全窗口的数据收集起来,等到计算的时候遍历计算。典型的全窗口函数 ProcessWindowFunction、windowFunction。它与增量聚合不同的是,增量聚合来一条数据就会先预聚合,等到窗口触发计算函数;全窗口函数是等窗口函数触发收集该窗口的全部数据一起计算。如果一个窗口数据突然特别多,那么就会造成压力;可以选择增量聚合函数。
触发器它定义 窗口什么时候关闭,触发计算并输出结果
定义移除某些数据的逻辑
可能某种原因(网络延迟)导致这些数据所在窗口已经触发了计算,所有这些数据可以允许迟到,但是这些数据不会加入其他窗口进行计算,而是输出到侧输出流进行计算。
将迟到的数据放入侧输出流
根据传感器id计算5秒内的的某时刻最大温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowFunction_ReduceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L))
.reduce(new CustomReduceFunction(), new CustomWindowFunctiom()); // 第一个参数就是每来一条数据就进行计算 第二个参数就是最终窗口触发计算
resultDataStream.print();
env.execute();
}
// 输入数据的泛性
public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
@Override
public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
if (sensorReading.getTemperature() > input.getTemperature()) {
return sensorReading;
} else {
return input;
}
}
}
/**
* SensorReading -> 输入数据类型
* String -> 输出类型
* String -> key 类型
* TimeWindow -> 窗口类型(TimeWindow 和 GlobalWindow)
*/
public static class CustomWindowFunctiom implements WindowFunction<SensorReading, String, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<SensorReading> iterable, Collector<String> collector) throws Exception {
iterable.iterator().forEachRemaining(sensor -> {
// 拼接输出字符串
String output = sensor.getId() + "在 " + sensor.getTimestamp() + "最大温度是:" + sensor.getTemperature();
collector.collect(output);
});
}
}
}
根据传感器id计算10秒内的平均温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowFunction_AggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(10L))
.aggregate(new CustomAggregateFunction(), new CustomWindowFunction());
resultDataStream.print();
env.execute();
}
/**
* SensorReading -> 输入类型
* Tuple2<Long, Double> -> 保存中间结果状态 也就是 来一条数据 Long + 1 Double + temperature
* Tuple2<Long, Double> -> 输出结果
*/
public static class CustomAggregateFunction implements AggregateFunction<SensorReading, Tuple2<Long, Double>, Tuple2<Long, Double>> {
// 初始化中间状态
@Override
public Tuple2<Long, Double> createAccumulator() {
return new Tuple2<>(0L, 0.0);
}
// 累计输入的数据
@Override
public Tuple2<Long, Double> add(SensorReading input, Tuple2<Long, Double> tuple2) {
tuple2.setFields(tuple2.f0 + 1, tuple2.f1 + input.getTemperature());
return tuple2;
}
// 返回
@Override
public Tuple2<Long, Double> getResult(Tuple2<Long, Double> tuple2) {
return tuple2;
}
// 区间累加
@Override
public Tuple2<Long, Double> merge(Tuple2<Long, Double> input1, Tuple2<Long, Double> input2) {
Tuple2<Long, Double> result = new Tuple2<>(input1.f0 + input2.f0, input1.f1 + input2.f1);
return result;
}
}
/**
* Tuple2<Long,Double> -> 输入结果 也就是 CustomAggregateFunction 中的输出结果
* Tuple -> key 类型
* TimeWindow -> 窗口类型
*/
public static class CustomWindowFunction implements WindowFunction<Tuple2<Long,Double>,String, Tuple, TimeWindow>{
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, Double>> iterable, Collector<String> collector) throws Exception {
Tuple2<Long, Double> result = iterable.iterator().next();
double avgTemp = result.f1 / result.f0;
String resultStr = tuple + "在10秒内的平均温度为:" + avgTemp;
collector.collect(resultStr);
}
}
}
根据传感器id计算10秒内的平均温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class WindowFunction_ProcessWindowFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(10L))
.process(new CustomProcessWindowFunction());
resultDataStream.print();
env.execute();
}
/**
* SensorReading -> 输入类型
* String -> 输出类型
* Tuple -> key 类型
* TimeWindow -> 时间窗口
*/
public static class CustomProcessWindowFunction extends ProcessWindowFunction<SensorReading, String, Tuple, TimeWindow> {
@Override
public void process(Tuple tuple, Context context, Iterable<SensorReading> iterable, Collector<String> collector) throws Exception {
long count = 0;
double tempTotal = 0.0;
Iterator<SensorReading> allDatas = iterable.iterator();
while (allDatas.hasNext()) {
SensorReading sensor = allDatas.next();
count++;
tempTotal += sensor.getTemperature();
}
double avgTemp = tempTotal / count;
String resultStr = tuple + "在10秒内平均温度为:" + avgTemp;
collector.collect(resultStr);
}
}
}
1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势。尽管 SQL
1.概述 转载:Flink 源码阅读笔记(6)- 计算资源管理 在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。 2.Task
1.概述 转载:Flink jvm参数配置GC日志 生产环境上,或者其他要测试 GC 问题的环境上,一定会配置上打印GC日志的参数,便于分析 GC 相关的问题。 但是可能很多人配置的都不够“完美”,要
1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl
1.概述 转载:Flink SQL代码生成与UDF重复调用的优化 2. 代码生成简介 代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成
1.概述 转载:面向流批一体的 Flink Runtime 新进展 首先是关于调度部分的性能优化。Flink 由于存在 all to all 的连接关系,两个并发为 n 的算子之间会有 n² 条边,这
在Fink源码中,有flink-stream-java和flink-stream-scala模块。 flink streaming 为什么需要两个模块? https://github.com/apac
我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
1.概述 转载:Flink 源码阅读笔记(4)- RPC 相关文章: 【Flink】Flink 源码之RPC调用 Flink】FLink 通讯组件 RPC 作为一个分布式系统,Flink 内部不同组件
1.概述 转载并且补充: flink keyby 分布不均匀问题 我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四
1.概述 转载:Flink Sort-Shuffle写流程简析 转载并且补充。 2.配置 taskmanager.network.sort-shuffle.min-parallelism 核心配置。设
1.概述 转载:Flink源码分析——批处理模式Map端数据聚合 在flink的批处理模式下,数据的计算也有着map/reduce两端的计算模型,这一点和MR、spark计算框架是类似的。在数据进行分
1.概述 转载:Flink on yarn 远程调试 大家好,我是 JasonLee。 前几天有小伙伴问我,我写的 Flink 代码是提交到 yarn 上去运行的,那我怎么能远程调试代码呢?在本地调试
当我使用 flink 事件时间窗口时,窗口就是不触发。请问如何解决,有什么debug的方法吗? 最佳答案 由于您使用的是事件时间窗口,所以很可能是水印问题。该窗口仅在水印取得进展时输出。事件时间没有提
我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流程看起来像 Source1 -> operator1 -> Sink1 Source2 -> operator2 -> S
我们正在尝试构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。通过阅读文档,在我看来,Flink 广播状态很适合这种情况。 作为实验,我构建了一个简化版本:假设我有一
我有一个 Flink Streaming 作业,它失败了,我得到如下日志。谁能告诉我如何解决这个问题?有时运行一天就失效,有时运行几个小时就失效。 09:30:25 948 INFO (org.ap
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafk
我是一名优秀的程序员,十分优秀!