- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
之前的转化算子是无法访问事件的时间戳信息和水位线watermark,但是,在某些情况下,显得很重要。Flink 提供了 DataStream API 的Low- Level转化算子。比如说可以访问事件时间戳、watermark、以及注册定时器,还可以输出一些特定的事件,比如超时事件等。Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现)。例如,Flink SQL 就是使用 Process Function 实现的。
Flink 提供了 8 个 Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
KeyedProcessFunction 用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
而KeyedProcessFunction<K, I, O>还额外提供了两个方法:
processElement(I value, Context ctx, Collector out):流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
onTimer(long timestamp, OnTimerContext ctx, Collector out):
是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
long currentProcessingTime() 返回当前处理时间
long currentWatermark() 返回当前 watermark 的时间戳
void registerProcessingTimeTimer(long timestamp) 会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
void registerEventTimeTimer(long timestamp) 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在keyed streams 上面使用。
监控温度传感器的温度值,如果温度值在 10 秒钟之内(processing time)连续上升,则报警。
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessFunction_KeyedProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了测试效果 用默认的 时间特征
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().setAutoWatermarkInterval(500L);
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<String> resultDataStream = inputDataStream.keyBy(SensorReading::getId)
.process(new CustomKeyedProcessFunction(10));
resultDataStream.print();
env.execute();
}
/**
* String -> key 类型
* SensorReading -> 输入类型
* String -> 输出类型
*/
public static class CustomKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {
// 时间间隔
private Integer internal;
public CustomKeyedProcessFunction(Integer internal) {
this.internal = internal;
}
// 上一条数据的传感器温度(状态编程在下面具体介绍)
private ValueState<Double> lastTemperatureState;
// 定时器的时间戳
private ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
/**
* "last-temp" -> 当前状态变量的名称
* Double.class -> 当前状态变量的类型
* Double.MIN_VALUE -> 当前状态变量的初始值
*/
lastTemperatureState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class, 0.0d));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("last-timer-ts", Long.class, 0L));
}
@Override
public void processElement(SensorReading input, Context context, Collector<String> collector) throws Exception {
// 1、获取上一次状态值
Double lastTemperature = lastTemperatureState.value();
Long timerState = timerTsState.value();
// 2、更新温度状态
lastTemperatureState.update(input.getTemperature());
// 3、比较上一次温度
if (input.getTemperature() > lastTemperature && timerState == 0) {
// first data
long timeTs = context.timerService().currentProcessingTime() + internal * 1000L;
// 注册定时器
context.timerService().registerProcessingTimeTimer(timeTs);
// 更新定时器状态值
timerTsState.update(timeTs);
} else if (input.getTemperature() < lastTemperature && timerState != 0) {
// 当前温度小于上一次温度 并且定时器不为null
// 删除定时器
context.timerService().deleteProcessingTimeTimer(timerState);
// 清除定时状态变量
timerTsState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 如果触发定时器函数 说明该传感器在10s内温度连续上升,需要预警
String key = ctx.getCurrentKey();
String resultStr = "传感器ID为:" + key + "在10s内温度连续上升...";
out.collect(resultStr);
// 清空定时器值
timerTsState.clear();
}
}
}
大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process
function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
根据传感器温度,将低于60度的数据输入到侧输出流
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutput_Demo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
// 定义获取侧输出流
final OutputTag<String> outputTag = new OutputTag<String>("side-out-put"){
};
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.process(new CustomSideOutput(outputTag));
DataStream<String> sideOutputDataStream = resultDataStream.getSideOutput(outputTag);
sideOutputDataStream.print("low > ");
resultDataStream.print();
env.execute();
}
public static class CustomSideOutput extends ProcessFunction<SensorReading, SensorReading> {
private OutputTag<String> outputTag;
public CustomSideOutput(OutputTag<String> outputTag) {
this.outputTag = outputTag;
}
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<SensorReading> collector) throws Exception {
if (sensorReading.getTemperature() < 60) {
String msg = sensorReading.getId() + " 的温度低于60度 -> " + sensorReading.getTemperature();
context.output(outputTag, msg);
}
collector.collect(sensorReading);
}
}
}
OkHttp的作用 OkHttp is an HTTP client。 如果是HTTP的方式想得到数据,就需要我们在页面上输入网址,如果网址没有问题,就有可能返回对应的String字符串,如果这个地址
Record 一个重要的字符串算法,这是第三次复习。 通过总结我认为之所以某个算法总是忘记,是因为大脑始终没有认可这种算法的逻辑(也就是脑回路)。 本篇主要讲解从KMP的应用场景,
SQL 注入基础 【若本文有问题请指正】 有回显 回显正常 基本步骤 1. 判断注入类型 数字型 or 字符型 数字型【示例】:
标签: #Prompt #LLM 创建时间:2023-04-28 17:05:45 链接: 课程(含JupyterNotebook) , 中文版 讲师: An
Swift是供iOS和OS X应用编程的新编程语言,基于C和Objective-C,而却没有C的一些兼容约束。Swift采用了安全的编程模式和添加现代的功能来是的编程更加简单、灵活和有趣。界面则基于
VulnStack-红日靶机七 概述 在 VulnStack7 是由 5 台目标机器组成的三层网络环境,分别为 DMZ 区、第二层网络、第三层网络。涉及到的知识点也是有很多,redis未授权的利用
红日靶机(一)笔记 概述 域渗透靶机,可以练习对域渗透的一些知识,主要还是要熟悉 powershell 语法,powershell 往往比 cmd 的命令行更加强大,而很多渗透开源的脚本都是 po
八大绩效域详细解析 18.1 干系人绩效域 跟干系人所有相关的活动. 一、预期目标 ①与干系人建立高效的工作关系 ②干系人认同项目目标 ③支持项目的干系人提高
18.3 开发方法和生命周期绩效域 跟开发方法,项目交付节奏和生命周期相关的活动和职能. 一、预期目标: ①开发方法与项目可交付物相符合; ②将项目交付与干系人价值紧密
18.7 度量绩效域 度量绩效域涉及评估项目绩效和采取应对措施相关的活动和职能度量是评估项目绩效,并采取适当的应对措施,以保持最佳项目绩效的过程。 一、 预期目标: ①对项目状况
pygraphviz 安装,windows系统: 正确的安装姿势: Prebuilt-Binaries/PyGraphviz at master · CristiFati/Prebuilt-Binar
今天给大家介绍IDEA开发工具如何配置devtools热加载工具。 1、devtools原理介绍 spring-boot-devtools是spring为开发者提供的热加载
一 什么是正则表达式 // 正则表达式(regular expression)是一个描述字符模式的对象; // JS定义RegExp类表示正则表达式; // String和RegExp都定义了使用
目前是2022-04-25 23:48:03,此篇博文分享到互联网上估计是1-2个月后的事了,此时的OpenCV3最新版是3.4.16 这里前提是gcc,g++,cmake都需要安装好。 没安装好的,
一、概述 1、Flink 是什么 Apache Flink is a framework and distributed processing engine for stateful comput
一、window 概述 Flink 通常处理流式、无限数据集的计算引擎,窗口是一种把无限流式数据集切割成有限的数据集进行计算。window窗口在Flink中极其重要。 二、window 类型 w
一、触发器(Trigger) 1.1、案例一 利用global window + trigger 计算单词出现三次统计一次(有点像CountWindow) 某台虚拟机或者mac 终端输入:nc -
一、时间语义 在Flink 中涉及到三个重要时间概念:EventTime、IngestionTime、ProcessingTime。 1.1、EventTime EventTime 表示日志事
一、概述 以wordcount为例,为什么每次输入数据,flink都能统计每个单词的总数呢?我们都没有显示保存每个单词的状态值,但是每来一条数据,都能计算单词的总数。事实上,flink在底层维护了每
一、概述 checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状
我是一名优秀的程序员,十分优秀!