- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
Flink 通常处理流式、无限数据集的计算引擎,窗口是一种把无限流式数据集切割成有限的数据集进行计算。window窗口在Flink中极其重要。
window 注意分为两大类型:CountWindow 和 TimeWindow
CountWindow 是与时间没有关系的,比如 数据收集的一定大小(1w)的时候就会触发窗口函数进行计算。
TimeWindow 就是时间窗口,它与时间非常紧密。主要分为三大类:
滚动窗口(Tumbling window)、滑动窗口(Sliding window)、回话窗口(Session window)。
特征:时间对齐,没有重叠,并且时间窗口大小固定
比如:计算五分钟内的数据。窗口起始时间假设为 10:00 那么时间到了 10:05 时刻就会触发 10:00 - 10:05 段时间窗口,注意区间是左闭右开。
与滚动窗口不同的是,滑动窗口多了滑动间隔时间,那么就会出现数据的重叠或者数据丢失。如果滑动时间间隔小于滑动窗口大小,那么就是出现数据的重叠,也就是重叠的数据可能被多个窗口计算;如果滑动时间间隔等于滑动窗口大小,那么就相等于滚动窗口;如果滑动时间间隔大于滑动时间窗口,那么就会出现数据的丢失。
比如:每隔1分钟计算五分钟内的数据,假设窗口起始时间为 10:00,那么就会触发 9:55 - 10:00 段的时间窗口,时间到了 10:01,那么就会触发 9:56 - 10:01 段的时间窗口,以此类推,注意窗口都是左闭右开的。
类似于Web应用的 Session 回话,简单一句话,在一定时间内没有接收到任务数据,那么上一次窗口就触发计算。
比如:5分钟内没有收到任何数据,就触发上一次窗口计算,并重新开一个窗口。假设接收到第一条数据的时间是 9:54(时间任意),目前窗口时间为 10:00 ,那么到了 10:05 并且在 10:00 - 10:05 段时间内没有接收任何数据,就会触发 9:54 - 10:00 段的窗口计算,并重新以 10:05 开一个新的窗口,往后有数据以此类推,如果还是在 10:05 - 10:10 没有数据,其实计不计算都显得不重要了。主要的特征是触发窗口时间不固定,窗口时间也不固定。
windwo API 核心是窗口适配器(windowAssigner)。windwo(入参) 的入参接受的就是窗口适配器,它负责把数据分发到正确的 window中。但是 Flink 已经提供好了通用的窗口适配器。
Flink 默认的时间窗口是根据 processing time(处理时间)进行划分和计算。但是应用大多数都是 Event time(事件时间),后面具体介绍,目前window API 默认使用 processing time。窗口还可以分为不同流窗口(也就是 keyby之后的数据)和全窗口(也就是全部数据)。
计算传感器五秒之内的最大温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Tumbling {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L)) // 滚动时间窗口大小
.maxBy("temperature");
resultDataStream.print();
env.execute();
}
}
每隔2秒计算传感器5秒内的最小温度值
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L), Time.seconds(2)) // 窗口大小5秒 滑动大小2秒
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L), Time.seconds(2)) // 窗口大小5秒 滑动大小2秒
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L), Time.seconds(2)) // 窗口大小5秒 滑动大小2秒
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
根据传感器id每传来3条数据就计算这些数据的最大温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CountWindow_Tumbling {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.countWindow(3)
.maxBy("temperature");
resultDataStream.print();
env.execute();
}
}
根据传感器id每隔2条数据计算最近5条数据的最小温度值
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CountWindow_Sliding {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.countWindow(5, 2)
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}
windwo function 主要针对触发窗口计算的操作,主要分为:
增量聚合函数、全窗口函数、其他可选 API。
简单地说就是,每来一条数据就计算,相当于预聚合,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction、AggregateFunction。
先把全窗口的数据收集起来,等到计算的时候遍历计算。典型的全窗口函数 ProcessWindowFunction、windowFunction。它与增量聚合不同的是,增量聚合来一条数据就会先预聚合,等到窗口触发计算函数;全窗口函数是等窗口函数触发收集该窗口的全部数据一起计算。如果一个窗口数据突然特别多,那么就会造成压力;可以选择增量聚合函数。
触发器它定义 窗口什么时候关闭,触发计算并输出结果
定义移除某些数据的逻辑
可能某种原因(网络延迟)导致这些数据所在窗口已经触发了计算,所有这些数据可以允许迟到,但是这些数据不会加入其他窗口进行计算,而是输出到侧输出流进行计算。
将迟到的数据放入侧输出流
根据传感器id计算5秒内的的某时刻最大温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowFunction_ReduceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(5L))
.reduce(new CustomReduceFunction(), new CustomWindowFunctiom()); // 第一个参数就是每来一条数据就进行计算 第二个参数就是最终窗口触发计算
resultDataStream.print();
env.execute();
}
// 输入数据的泛性
public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
@Override
public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
if (sensorReading.getTemperature() > input.getTemperature()) {
return sensorReading;
} else {
return input;
}
}
}
/**
* SensorReading -> 输入数据类型
* String -> 输出类型
* String -> key 类型
* TimeWindow -> 窗口类型(TimeWindow 和 GlobalWindow)
*/
public static class CustomWindowFunctiom implements WindowFunction<SensorReading, String, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<SensorReading> iterable, Collector<String> collector) throws Exception {
iterable.iterator().forEachRemaining(sensor -> {
// 拼接输出字符串
String output = sensor.getId() + "在 " + sensor.getTimestamp() + "最大温度是:" + sensor.getTemperature();
collector.collect(output);
});
}
}
}
根据传感器id计算10秒内的平均温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowFunction_AggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(10L))
.aggregate(new CustomAggregateFunction(), new CustomWindowFunction());
resultDataStream.print();
env.execute();
}
/**
* SensorReading -> 输入类型
* Tuple2<Long, Double> -> 保存中间结果状态 也就是 来一条数据 Long + 1 Double + temperature
* Tuple2<Long, Double> -> 输出结果
*/
public static class CustomAggregateFunction implements AggregateFunction<SensorReading, Tuple2<Long, Double>, Tuple2<Long, Double>> {
// 初始化中间状态
@Override
public Tuple2<Long, Double> createAccumulator() {
return new Tuple2<>(0L, 0.0);
}
// 累计输入的数据
@Override
public Tuple2<Long, Double> add(SensorReading input, Tuple2<Long, Double> tuple2) {
tuple2.setFields(tuple2.f0 + 1, tuple2.f1 + input.getTemperature());
return tuple2;
}
// 返回
@Override
public Tuple2<Long, Double> getResult(Tuple2<Long, Double> tuple2) {
return tuple2;
}
// 区间累加
@Override
public Tuple2<Long, Double> merge(Tuple2<Long, Double> input1, Tuple2<Long, Double> input2) {
Tuple2<Long, Double> result = new Tuple2<>(input1.f0 + input2.f0, input1.f1 + input2.f1);
return result;
}
}
/**
* Tuple2<Long,Double> -> 输入结果 也就是 CustomAggregateFunction 中的输出结果
* Tuple -> key 类型
* TimeWindow -> 窗口类型
*/
public static class CustomWindowFunction implements WindowFunction<Tuple2<Long,Double>,String, Tuple, TimeWindow>{
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, Double>> iterable, Collector<String> collector) throws Exception {
Tuple2<Long, Double> result = iterable.iterator().next();
double avgTemp = result.f1 / result.f0;
String resultStr = tuple + "在10秒内的平均温度为:" + avgTemp;
collector.collect(resultStr);
}
}
}
根据传感器id计算10秒内的平均温度
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class WindowFunction_ProcessWindowFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy("id")
.timeWindow(Time.seconds(10L))
.process(new CustomProcessWindowFunction());
resultDataStream.print();
env.execute();
}
/**
* SensorReading -> 输入类型
* String -> 输出类型
* Tuple -> key 类型
* TimeWindow -> 时间窗口
*/
public static class CustomProcessWindowFunction extends ProcessWindowFunction<SensorReading, String, Tuple, TimeWindow> {
@Override
public void process(Tuple tuple, Context context, Iterable<SensorReading> iterable, Collector<String> collector) throws Exception {
long count = 0;
double tempTotal = 0.0;
Iterator<SensorReading> allDatas = iterable.iterator();
while (allDatas.hasNext()) {
SensorReading sensor = allDatas.next();
count++;
tempTotal += sensor.getTemperature();
}
double avgTemp = tempTotal / count;
String resultStr = tuple + "在10秒内平均温度为:" + avgTemp;
collector.collect(resultStr);
}
}
}
https://github.com/mattdiamond/Recorderjs/blob/master/recorder.js中的代码 我不明白 JavaScript 语法,比如 (functio
在 iOS 7 及更早版本中,如果我们想在应用程序中找到 topMostWindow,我们通常使用以下代码行 [[[UIApplication sharedApplication] windows]
我已经尝试解决这个问题很长一段时间了:我无法访问窗口的 url,因为它位于另一个域上..有一些解决方案吗? function login() { var cb = window.ope
是否可以将 FFMPEG 视频流传递到 C# 窗口?现在它在新窗口中作为新进程打开,我只是想将它传递给我自己的 SessionWindow。 此时我像这样执行ffplay: public void E
我有一个名为 x 的矩阵看起来像这样: pTime Close 1 1275087600 1.2268 2 1275264000 1.2264 3 1275264300 1.2
在编译时,发生搜索,grep搜索等,Emacs会在单独的窗口中创建一个新的缓冲区来显示结果,有没有自动跳转到那个窗口的方法?这很有用,因为我可以使用 n 和 p 而不是 M-g n 和 M-g p 移
我有一个启动 PowerShell 脚本的批处理文件。 批处理文件: START Powershell -executionpolicy RemoteSigned -noexit -file "MyS
我有一个基于菜单栏的应用程序,单击图标时会显示一个窗口。在 Mac OS X Lion 上一切正常,但由于某种原因,在 Snow Leopard 和早期版本的 Mac OS X 上会出现错误。任何时候
在 macOS 中,如何在 Xcode 和/或 Interface Builder 中创建带有“集成标题栏和工具栏”的窗口? 这是“宽标题栏”类型的窗口,已添加到 OS X 10.10 Yosemit
在浏览器 (Chrome) 中 JavaScript: var DataModler = { Data: { Something: 'value' }, Process: functi
我有 3 个 html 页面。第 1 页链接到第 2 页,第 2 页链接到第 3 页(为了简单起见)。 我希望页面 2 中的链接打开页面 3 并关闭页面 1(选项卡 1)。 据我了解,您无法使用 Ja
当点击“创建节点”按钮时,如何打开一个新的框架或窗口?我希望新框架包含一个文本字段和下拉菜单,以便用户可以选择一个选项。 Create node Search node
我有一个用户控件,用于编辑应用程序中的某些对象。 我最近遇到一个实例,我想弹出一个新的对话框(窗口)来托管此用户控件。 如何实例化新窗口并将需要设置的任何属性从窗口传递到用户控件? 感谢您的宝贵时间。
我有一个Observable,它发出许多对象,我想使用window或buffer操作对这些对象进行分组。但是,我不想指定count参数来确定窗口中应包含多少个对象,而是希望能够使用自定义条件。 例如,
我有以下代码,它打开一个新的 JavaFX 阶段(我们称之为窗口)。 openAlertBox.setOnAction(e -> { AlertBox alert = AlertBox
我要添加一个“在新窗口中打开”上下文菜单项,该菜单项将以新的UIScene打开我的应用程序文档之一。当然,我只想在实际上支持多个场景的设备上显示该菜单项。 目前,我只是在检查设备是否是使用旧设备的iP
我正在尝试创建一个 AIR 应用程序来记录应用程序的使用情况,使用 AIR 从系统获取信息的唯一简单方法是使用命令行工具和抓取 标准输出 . 我知道像 这样的工具顶部 和 ps 对于 OS X,但它们
所以我有这个简单的 turtle 螺旋制作器,我想知道是否有一种方法可以打印出由该程序创建的我的设计副本。 代码: import turtle x= float(input("Angle: ")) y
我正在编写一个 C# WPF 程序,它将文本消息发送到另一个程序的窗口。我有一个宏程序作为我的键盘驱动程序 (Logitech g15) 的一部分,它已经这样做了,尽管它不会将击键直接发送到进程,而是
我尝试使用以下代码通过 UDP 发送,但得到了奇怪的结果。 if((sendto(newSocket, sendBuf, totalLength, 0, (SOCKADDR *)&sendAd
我是一名优秀的程序员,十分优秀!