- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
利用global window + trigger 计算单词出现三次统计一次(有点像CountWindow)
某台虚拟机或者mac 终端输入:nc -lk 9999
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.util.Collector;
public class WindowFunction_Global_Trigger {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new CustomFlatMap())
.keyBy(0)
.window(GlobalWindows.create()) // 如果不调用trigger 那么程序一直处于数据收集阶段 无法触发计算
.trigger(CountTrigger.of(3))
.sum(1);
resultDataStream.print();
env.execute();
}
public static class CustomFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
终端输入:
hello flink
hello spark
hello hive
控制台打印:(hello,3)
利用global window 自定义一个CountWindow,也是单词出现3次统计一次
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
public class WindowFunction_CustomTrigger {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new CustomFlatMap())
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new CustomTrigger(3))
.sum(1);
resultDataStream.print();
env.execute();
}
public static class CustomFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
public static class CustomTrigger extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
private long maxCount;
public CustomTrigger(long count) {
this.maxCount = count;
}
// 定义一个状态保存 每个 key 对应的 count 值 (涉及到状态编程 后面会具体介绍)
private ReducingStateDescriptor<Long> stateDescriptor = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
@Override
public Long reduce(Long input1, Long input2) throws Exception {
return input1 + input2;
}
}, Long.class);
/**
* 每来一条数据都会执行
*
* @param input 输入类型
* @param timestamp 处理时间戳
* @param globalWindow 全窗口类型(所属窗口)
* @param triggerContext trigger 上下文
* @return TriggerResult
* 1. TriggerResult.CONTINUE :表示对 window 不做任何处理
* 2. TriggerResult.FIRE :表示触发 window 的计算
* 3. TriggerResult.PURGE :表示清除 window 中的所有数据
* 4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
* @throws Exception
*/
@Override
public TriggerResult onElement(Tuple2<String, Integer> input, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 获取 key 对应之前 count 状态值
ReducingState<Long> count = triggerContext.getPartitionedState(stateDescriptor);
// 每来一条数据 累加 1
count.add(1L);
if (maxCount == count.get()) {
// 如果已经达到预期的count
// 1 清除 count 状态
count.clear();
// 2 先触发计算 再清空窗口的数据
return TriggerResult.FIRE_AND_PURGE;
}
// 3 否则不做任务处理
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 processingTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 EventTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 清理状态
triggerContext.getPartitionedState(stateDescriptor).clear();
}
}
}
终端输入:
world spark
world flink
world hive
控制台打印:(world,3)
利用global window + trigger + evictor 实现每个2个单词统计最近3个单词
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class WindowFuncton_CustomEvictor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new CustomFlatMap())
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new CustomTrigger(2)) // 每来2条数据触发后面的计算
.evictor(new CustomEvictor(3))
.sum(1);
resultDataStream.print();
env.execute();
}
public static class CustomFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
public static class CustomTrigger extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
private long maxCount;
public CustomTrigger(long count) {
this.maxCount = count;
}
// 定义一个状态保存 每个 key 对应的 count 值 (涉及到状态编程 后面会具体介绍)
private ReducingStateDescriptor<Long> stateDescriptor = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
@Override
public Long reduce(Long input1, Long input2) throws Exception {
return input1 + input2;
}
}, Long.class);
/**
* 每来一条数据都会执行
*
* @param input 输入类型
* @param timestamp 处理时间戳
* @param globalWindow 全窗口类型(所属窗口)
* @param triggerContext trigger 上下文
* @return TriggerResult
* 1. TriggerResult.CONTINUE :表示对 window 不做任何处理
* 2. TriggerResult.FIRE :表示触发 window 的计算
* 3. TriggerResult.PURGE :表示清除 window 中的所有数据
* 4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
* @throws Exception
*/
@Override
public TriggerResult onElement(Tuple2<String, Integer> input, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 获取 key 对应之前 count 状态值
ReducingState<Long> count = triggerContext.getPartitionedState(stateDescriptor);
// 每来一条数据 累加 1
count.add(1L);
if (maxCount == count.get()) {
// 如果已经达到预期的count
// 1 清除 count 状态
count.clear();
// 2 先触发计算 不清空窗口的数据
return TriggerResult.FIRE;
}
// 3 否则不做任务处理
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 processingTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 EventTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 清理状态
triggerContext.getPartitionedState(stateDescriptor).clear();
}
}
public static class CustomEvictor implements Evictor<Tuple2<String, Integer>, GlobalWindow> {
// 定义窗口的数据大小
private long windowCount;
public CustomEvictor(long windowCount) {
this.windowCount = windowCount;
}
/**
* @param iterable 当前窗口的全部数据 (可以认为这些数据是有顺序的(相对队列))
* @param size 当前窗口的数据大小
* @param globalWindow
* @param evictorContext 上下文
*/
@Override
public void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> iterable, int size, GlobalWindow globalWindow, EvictorContext evictorContext) {
// 如果输入数据窗口大小等于指定窗口大小 没有数据可以移除
if (windowCount == size) {
return;
} else {
// 临时 count 用来判断移除哪些数据
int evictorCount = 0;
Iterator<TimestampedValue<Tuple2<String, Integer>>> iterator = iterable.iterator();
while (iterator.hasNext()) {
iterator.next();
evictorCount++;
// 判断什么时候可以移除哪些数据
/**
* 比如当前窗口共有5条数据 统计最近3条数据 移除2条数据
* evictorCount = 1 size = 5 windowCount = 3 (需要移除当前遍历数据)
* evictorCount = 2 size = 5 windowCount = 3 (需要移除当前遍历数据)
* evictorCount = 3 size = 5 windowCount = 3 (不需要移除当前遍历数据)
* ...
*/
if (evictorCount > size - windowCount) {
break;
} else {
iterator.remove();
}
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> iterable, int i, GlobalWindow globalWindow, EvictorContext evictorContext) {
}
}
}
终端输入:
flink A
flink B
(此时控制台已经打印(flink,2) )
flink C
flink D
(此时控制台已经答应 (flink,3))
flink E
flink F
(此时控制台已经答应 (flink,3))
以此类推
迟到数据:这些数据根据 EventTime已经分配好了所在窗口执行,但是所在窗口已经执行计算了,并且这些数据不会被当前窗口所收集和计算。
侧输出流:把迟到的数据放入到侧输出流中,单独进行计算。
由于该案例涉及到 EventTime(日志产生时间)、ingestTime(数据进入程序时间)、processTime(数据处理时间)的关系和 watermark(水位线)的概念描述,该案例放到下面具体介绍。
OkHttp的作用 OkHttp is an HTTP client。 如果是HTTP的方式想得到数据,就需要我们在页面上输入网址,如果网址没有问题,就有可能返回对应的String字符串,如果这个地址
Record 一个重要的字符串算法,这是第三次复习。 通过总结我认为之所以某个算法总是忘记,是因为大脑始终没有认可这种算法的逻辑(也就是脑回路)。 本篇主要讲解从KMP的应用场景,
SQL 注入基础 【若本文有问题请指正】 有回显 回显正常 基本步骤 1. 判断注入类型 数字型 or 字符型 数字型【示例】:
标签: #Prompt #LLM 创建时间:2023-04-28 17:05:45 链接: 课程(含JupyterNotebook) , 中文版 讲师: An
Swift是供iOS和OS X应用编程的新编程语言,基于C和Objective-C,而却没有C的一些兼容约束。Swift采用了安全的编程模式和添加现代的功能来是的编程更加简单、灵活和有趣。界面则基于
VulnStack-红日靶机七 概述 在 VulnStack7 是由 5 台目标机器组成的三层网络环境,分别为 DMZ 区、第二层网络、第三层网络。涉及到的知识点也是有很多,redis未授权的利用
红日靶机(一)笔记 概述 域渗透靶机,可以练习对域渗透的一些知识,主要还是要熟悉 powershell 语法,powershell 往往比 cmd 的命令行更加强大,而很多渗透开源的脚本都是 po
八大绩效域详细解析 18.1 干系人绩效域 跟干系人所有相关的活动. 一、预期目标 ①与干系人建立高效的工作关系 ②干系人认同项目目标 ③支持项目的干系人提高
18.3 开发方法和生命周期绩效域 跟开发方法,项目交付节奏和生命周期相关的活动和职能. 一、预期目标: ①开发方法与项目可交付物相符合; ②将项目交付与干系人价值紧密
18.7 度量绩效域 度量绩效域涉及评估项目绩效和采取应对措施相关的活动和职能度量是评估项目绩效,并采取适当的应对措施,以保持最佳项目绩效的过程。 一、 预期目标: ①对项目状况
pygraphviz 安装,windows系统: 正确的安装姿势: Prebuilt-Binaries/PyGraphviz at master · CristiFati/Prebuilt-Binar
今天给大家介绍IDEA开发工具如何配置devtools热加载工具。 1、devtools原理介绍 spring-boot-devtools是spring为开发者提供的热加载
一 什么是正则表达式 // 正则表达式(regular expression)是一个描述字符模式的对象; // JS定义RegExp类表示正则表达式; // String和RegExp都定义了使用
目前是2022-04-25 23:48:03,此篇博文分享到互联网上估计是1-2个月后的事了,此时的OpenCV3最新版是3.4.16 这里前提是gcc,g++,cmake都需要安装好。 没安装好的,
一、概述 1、Flink 是什么 Apache Flink is a framework and distributed processing engine for stateful comput
一、window 概述 Flink 通常处理流式、无限数据集的计算引擎,窗口是一种把无限流式数据集切割成有限的数据集进行计算。window窗口在Flink中极其重要。 二、window 类型 w
一、触发器(Trigger) 1.1、案例一 利用global window + trigger 计算单词出现三次统计一次(有点像CountWindow) 某台虚拟机或者mac 终端输入:nc -
一、时间语义 在Flink 中涉及到三个重要时间概念:EventTime、IngestionTime、ProcessingTime。 1.1、EventTime EventTime 表示日志事
一、概述 以wordcount为例,为什么每次输入数据,flink都能统计每个单词的总数呢?我们都没有显示保存每个单词的状态值,但是每来一条数据,都能计算单词的总数。事实上,flink在底层维护了每
一、概述 checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状
我是一名优秀的程序员,十分优秀!