- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
以wordcount为例,为什么每次输入数据,flink都能统计每个单词的总数呢?我们都没有显示保存每个单词的状态值,但是每来一条数据,都能计算单词的总数。事实上,flink在底层维护了每个 key的状态,就是state。比较于Spark,Spark如果没有显示保存其中的状态,它会统计当前批次的单词次数,也就是没有了历史总数,这就相当于,来一条数据我就处理,不管之前的数据,这就是无状态。总之,状态在Flink编程中显得极其重要,也是新一代实时流式处理框架的核心。
state:一般指一个具体的task/operator的状态。State可以被记录,在失败的情况下数据还可以恢复,Flink中有两种基本类型的State:Keyed State,Operator State,他们两种都可以以两种形式存在:原始状态(raw state)和托管状态(managed state)。
托管状态:由Flink框架管理的状态,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑它。
下图是保存状态流程:
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有
数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
state是task级别的state,说白了就是每个task对应一个state。
Flink 为算子状态提供三种基本数据结构:
简单来说,就是用一个列表集合保存当前task的状态。
也是将当前task的状态保存到列表集合中,也普通的列表状态不同的是,当发生故障或者利用检查点(Checkpoint)启动应用程序的时候,就可以利用联合列表状态恢复。
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。相当于 Spark 的广播机制。
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。
Flink 的 Keyed State 支持以下数据类型:
ValueState[T] 保存单个值,数据类型是T
get操作: ValueState.value()
set操作: ValueState.update(T value)
ListState[T] 保存列表,数据类型是T
添加一个元素: ListState.add(T value)
添加多个元素:ListState.addAll(List values)
获取全部数据:ListState.get()返回 Iterable
更新全部数据:ListState.update(List values)
MapState<K, V>保存 Key-Value 对状态
获取一个 key 的value:MapState.get(UK key)
添加一个key-value:MapState.put(UK key, UV value)
判断是否包含一个key:MapState.contains(UK key)
移除一个key:MapState.remove(UK key)
第一个聚合状态:ReducingState[T] 之前有所代码实现过。
第二个聚合状态:AggregatingState<I, O>
State.clear()是清空操作
根据传感器id每接收到三条数据就计算平均温度并输出。
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class State_ValueState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
inputDataStream.keyBy(SensorReading::getId)
.flatMap(new CustomFlatMapFunction())
.print();
env.execute();
}
/**
* RichFlatMapFunction -> FlatMapFunction 的富函数 有生命周期的
* SensorReading -> 输入类型
* Tuple2<String,Double> -> 输出类型
*/
public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
private ValueState<Tuple2<Long, Double>> valueState;
@Override
public void open(Configuration conf) throws Exception {
// 初始化
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple2<Long, Double>>("value-state", Types.TUPLE(Types.LONG, Types.DOUBLE)));
}
@Override
public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
Tuple2<Long, Double> lastState = valueState.value();
// 没有初始化
if (lastState == null) {
lastState = Tuple2.of(0L, 0.0d);
}
lastState.f0 += 1;
lastState.f1 += sensorReading.getTemperature();
valueState.update(lastState);
if (lastState.f0 >= 3) {
double avg = lastState.f1 / lastState.f0;
collector.collect(new Tuple2<>(sensorReading.getId(), avg));
valueState.clear();
}
}
}
}
根据传感器id每接收到三条数据就计算平均温度并输出
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Collections;
public class State_ListState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
inputDataStream.keyBy(SensorReading::getId)
.flatMap(new CustomFlatMapFunction())
.print();
env.execute();
}
public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
private ListState<Tuple2<String, Double>> listState;
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Double>>("list-state", Types.TUPLE(Types.STRING, Types.DOUBLE)));
}
@Override
public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
Iterable<Tuple2<String, Double>> lastListState = listState.get();
// 还没有初始化
if (lastListState == null) {
listState.addAll(Collections.emptyList());
}
// 添加元素
listState.add(new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()));
// 判断
ArrayList<Tuple2<String, Double>> listTuples = Lists.newArrayList(listState.get());
if (listTuples.size() >= 3) {
long count = listTuples.size();
double tempTotal = 0.0;
for (Tuple2<String, Double> tuple2 : listTuples) {
tempTotal += tuple2.f1;
}
double avg = tempTotal / count;
collector.collect(new Tuple2<>(sensorReading.getId(), avg));
listState.clear();
}
}
}
}
根据传感器id每接收到三条数据就计算平均温度并输出
public class State_MapState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
inputDataStream.keyBy(SensorReading::getId)
.flatMap(new CustomFlatMapFunction())
.print();
env.execute();
}
public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
private MapState<String, Double> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("map-state", String.class, Double.class));
}
@Override
public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
mapState.put(UUID.randomUUID().toString().substring(0, 8), sensorReading.getTemperature());
if (Lists.newArrayList(mapState.keys()).size() >= 3) {
int count = 0;
double tempTotal = 0.0;
for (Double temp : Lists.newArrayList(mapState.values())) {
count++;
tempTotal += temp;
}
double avg = tempTotal / count;
collector.collect(new Tuple2<String, Double>(sensorReading.getId(), avg));
mapState.clear();
}
}
}
}
根据传感器id统计所有温度的总和
public class State_ReducingState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
inputDataStream.keyBy(SensorReading::getId)
.flatMap(new CustomFlatMapFunction())
.print();
env.execute();
}
public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
private ReducingState<Double> reducingState;
@Override
public void open(Configuration parameters) throws Exception {
reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Double>("reducing-state", new ReduceFunction<Double>() {
@Override
public Double reduce(Double input1, Double input2) throws Exception {
return input1 + input2;
}
}, Double.class));
}
@Override
public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
reducingState.add(sensorReading.getTemperature());
collector.collect(new Tuple2<>(sensorReading.getId(), reducingState.get()));
}
}
}
根据传感器id聚合性输出所有温度数据(持续性)
import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class State_AggregatingState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
inputDataStream.keyBy(SensorReading::getId)
.flatMap(new CustomFlatMapFunction())
.print();
env.execute();
}
public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, String>> {
private AggregatingState<SensorReading, String> aggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
/**
* SensorReading -> 输入类型
* String -> 累加器
* String -> 输出类型
*/
AggregatingStateDescriptor<SensorReading, String, String> descriptor = new AggregatingStateDescriptor<>("aggregating-state", new AggregateFunction<SensorReading, String, String>() {
// 初始化输出类型前缀
@Override
public String createAccumulator() {
return "温度列表:";
}
// 每来一条数据就拼接返回值
@Override
public String add(SensorReading sensorReading, String acc) {
return acc + "," + sensorReading.getTemperature();
}
// 输出返回值
@Override
public String getResult(String acc) {
return acc;
}
// 不同分区的结果进行拼接
@Override
public String merge(String acc1, String acc2) {
return acc1 + "," + acc2;
}
}, String.class);
aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, String>> collector) throws Exception {
aggregatingState.add(sensorReading);
collector.collect(new Tuple2<>(sensorReading.getId(), aggregatingState.get()));
}
}
}
1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势。尽管 SQL
1.概述 转载:Flink 源码阅读笔记(6)- 计算资源管理 在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。 2.Task
1.概述 转载:Flink jvm参数配置GC日志 生产环境上,或者其他要测试 GC 问题的环境上,一定会配置上打印GC日志的参数,便于分析 GC 相关的问题。 但是可能很多人配置的都不够“完美”,要
1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl
1.概述 转载:Flink SQL代码生成与UDF重复调用的优化 2. 代码生成简介 代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成
1.概述 转载:面向流批一体的 Flink Runtime 新进展 首先是关于调度部分的性能优化。Flink 由于存在 all to all 的连接关系,两个并发为 n 的算子之间会有 n² 条边,这
在Fink源码中,有flink-stream-java和flink-stream-scala模块。 flink streaming 为什么需要两个模块? https://github.com/apac
我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
1.概述 转载:Flink 源码阅读笔记(4)- RPC 相关文章: 【Flink】Flink 源码之RPC调用 Flink】FLink 通讯组件 RPC 作为一个分布式系统,Flink 内部不同组件
1.概述 转载并且补充: flink keyby 分布不均匀问题 我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四
1.概述 转载:Flink Sort-Shuffle写流程简析 转载并且补充。 2.配置 taskmanager.network.sort-shuffle.min-parallelism 核心配置。设
1.概述 转载:Flink源码分析——批处理模式Map端数据聚合 在flink的批处理模式下,数据的计算也有着map/reduce两端的计算模型,这一点和MR、spark计算框架是类似的。在数据进行分
1.概述 转载:Flink on yarn 远程调试 大家好,我是 JasonLee。 前几天有小伙伴问我,我写的 Flink 代码是提交到 yarn 上去运行的,那我怎么能远程调试代码呢?在本地调试
当我使用 flink 事件时间窗口时,窗口就是不触发。请问如何解决,有什么debug的方法吗? 最佳答案 由于您使用的是事件时间窗口,所以很可能是水印问题。该窗口仅在水印取得进展时输出。事件时间没有提
我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流程看起来像 Source1 -> operator1 -> Sink1 Source2 -> operator2 -> S
我们正在尝试构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。通过阅读文档,在我看来,Flink 广播状态很适合这种情况。 作为实验,我构建了一个简化版本:假设我有一
我有一个 Flink Streaming 作业,它失败了,我得到如下日志。谁能告诉我如何解决这个问题?有时运行一天就失效,有时运行几个小时就失效。 09:30:25 948 INFO (org.ap
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafk
我是一名优秀的程序员,十分优秀!