- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
从Flink是否接管角度:可以分为:ManagedState(托管状态) 和 RawState(原始状态)
两者的区别如下:
1、从状态管理方式的方式来说,Managed State 由 Flink Runtime 管理,自动存储,自动恢复,在内存管理上有优化;而 Raw State 需要用户自己管理,需要自己序列化,Flink 不知道 State 中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。
2、从状态数据结构来说,Managed State 支持已知的数据结构,如 Value、List、Map 等。而 Raw State只支持字节数组 ,所有状态都要转换为二进制字节数组才可以。
3、从推荐使用场景来说,Managed State 大多数情况下均可使用,而 Raw State 是当 Managed State 不够用时,比如需要自定义 Operator 时,才会使用 Raw State。
在实际生产中,都只推荐使用ManagedState,后续将围绕该话题进行讨论。
Managed State 分为两种,Keyed State 和 Operator State (Raw State都是Operator State)
在Flink Stream模型中,Datastream 经过 keyBy 的操作可以变为 KeyedStream。
Keyed State是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,如stream.keyBy(…)
KeyBy之后的State,可以理解为分区过的State,每个并行keyed Operator的每个实例的每个key都有一个Keyed State,即<parallel-operator-instance,key>就是一个唯一的状态,由于每个key属于一个keyed Operator的并行实例,因此我们将其简单的理解为<operator,key>
这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为 list state 的 operator state
Operator State又称为 non-keyed state,与Key无关的State,每一个 operator state 都仅与一个 operator 的实例绑定。
Operator State 可以用于所有算子,但一般常用于 Source
前面说过有状态计算其实就是需要考虑历史数据
而历史数据需要搞个地方存储起来
Flink为了方便不同分类的State的存储和管理,提供了如下的API/数据结构来存储State!
Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。保存Keyed state的数据结构:
ValueState
:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值,如求按用户id统计用户交易总额 ListState
:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable 来遍历状态值,如统计按用户id统计用户经常登录的Ip ReducingState
:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值 MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素
需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄
Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口。
保存Operator state的数据结构:
ListState
BroadcastState<K,V>
举例来说,Flink中的FlinkKafkaConsumer,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射
下图就word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:
官网示例://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/
需求:使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)
编码步骤:
//-1.定义一个状态用来存放最大值
private transient ValueState
maxValueState; //-2.创建一个状态描述符对象
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
//-4.使用State
Long historyValue = maxValueState.value();
//判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue)
//-5.更新状态
maxValueState.update(currentValue);
代码示例:
package com.ddkk.state;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author ddkk.com 弟弟快看,程序员编程资料站
* Desc
* 使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
*/
public class StateDemo01_KeyedState {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//方便观察
//2.Source
DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L)
);
//3.Transformation
//使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
//实现方式1:直接使用maxBy--开发中使用该方式即可
//min只会求出最小的那个字段,其他的字段不管
//minBy会求出最小的那个字段和对应的其他的字段
//max只会求出最大的那个字段,其他的字段不管
//maxBy会求出最大的那个字段和对应的其他的字段
SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0)
.maxBy(1);
//实现方式2:使用KeyState中的ValueState---学习测试时使用,或者后续项目中/实际开发中遇到复杂的Flink没有实现的逻辑,才用该方式!
SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0)
.map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
//-1.定义状态用来存储最大值
private ValueState<Long> maxValueState = null;
@Override
public void open(Configuration parameters) throws Exception {
//-2.定义状态描述符:描述状态的名称和里面的数据类型
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根据状态描述符初始化状态
maxValueState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
//-4.使用State,取出State中的最大值/历史最大值
Long historyMaxValue = maxValueState.value();
Long currentValue = value.f1;
if (historyMaxValue == null || currentValue > historyMaxValue) {
//5-更新状态,把当前的作为新的最大值存到状态中
maxValueState.update(currentValue);
return Tuple3.of(value.f0, currentValue, currentValue);
} else {
return Tuple3.of(value.f0, currentValue, historyMaxValue);
}
}
});
//4.Sink
//result.print();
result2.print();
//5.execute
env.execute();
}
}
下图对word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用 operator state:
官网代码示例://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/
需求:使用ListState存储offset模拟Kafka的offset维护
编码步骤:
//-1.声明一个OperatorState来记录offset
private ListState
offsetState = null; private Long offset = 0L;
//-2.创建状态描述器
ListStateDescriptor
descriptor = new ListStateDescriptor ("offsetState", Long.class); //-3.根据状态描述器获取State
offsetState = context.getOperatorStateStore().getListState(descriptor);
//-4.获取State中的值
Iterator
iterator = offsetState.get().iterator(); if (iterator.hasNext()) {//迭代器中有值
offset = iterator.next();//取出的值就是offset
}
offset += 1L;
ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",当前的offset为:" + offset);
if (offset % 5 == 0) {//每隔5条消息,模拟一个异常
//-5.保存State到Checkpoint中
offsetState.clear();//清理内存中存储的offset到Checkpoint中
//-6.将offset存入State中
offsetState.add(offset);
代码示例:
package com.ddkk.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* Author ddkk.com 弟弟快看,程序员编程资料站
* Desc
* 需求:
* 使用OperatorState支持的数据结构ListState存储offset信息, 模拟Kafka的offset维护,
* 其实就是FlinkKafkaConsumer底层对应offset的维护!
*/
public class StateDemo02_OperatorState {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
//2.Source
DataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());
//3.Transformation
//4.Sink
sourceData.print();
//5.execute
env.execute();
}
/**
* MyKafkaSource就是模拟的FlinkKafkaConsumer并维护offset
*/
public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
//-1.声明一个OperatorState来记录offset
private ListState<Long> offsetState = null;
private Long offset = 0L;
private boolean flag = true;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
//-2.创建状态描述器
ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
//-3.根据状态描述器初始化状态
offsetState = context.getOperatorStateStore().getListState(descriptor);
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
//-4.获取并使用State中的值
Iterator<Long> iterator = offsetState.get().iterator();
if (iterator.hasNext()){
offset = iterator.next();
}
while (flag){
offset += 1;
int id = getRuntimeContext().getIndexOfThisSubtask();
ctx.collect("分区:"+id+"消费到的offset位置为:" + offset);//1 2 3 4 5 6
//Thread.sleep(1000);
TimeUnit.SECONDS.sleep(2);
if(offset % 5 == 0){
System.out.println("程序遇到异常了.....");
throw new Exception("程序遇到异常了.....");
}
}
}
@Override
public void cancel() {
flag = false;
}
/**
* 下面的snapshotState方法会按照固定的时间间隔将State信息存储到Checkpoint/磁盘中,也就是在磁盘做快照!
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//-5.保存State到Checkpoint中
offsetState.clear();//清理内存中存储的offset到Checkpoint中
//-6.将offset存入State中
offsetState.add(offset);
}
}
}
我正在通读 Windows Phone 7.5 Unleashed,有很多代码看起来像这样(在页面的代码隐藏中): bool loaded; protected override void OnNav
在cgi服务器中,我这样返回 print ('Status: 201 Created') print ('Content-Type: text/html') print ('Location: htt
我正在查看 esh(easy shell)的实现,无法理解在这种情况下什么是 22 和 9 信号。理想情况下,有一个更具描述性的常量,但我找不到列表。 最佳答案 信号列表及其编号(包括您看到的这两个)
我的Oozie Hive Action 永远处于运行模式。 oozie.log文件中没有显示错误。
我正在编写一个使用 RFCOMM 通过蓝牙连接到设备的 Android 应用程序。我使用 BluetoothChat 示例作为建立连接的基础,大部分时间一切正常。 但是,有时由于出现套接字已打开的消息
我有一个云调度程序作业,它应该每小时访问我的 API 以更新一些价格。这些作业大约需要 80 秒才能运行。 这是它的作用: POST https://www.example.com/api/jobs/
我正在 Tomcat 上访问一个简单的 JSP 页面: 但是当我使用 curl 测试此页面时,我得到了 200 响应代码而不是预期的 202: $ curl -i "http://localhos
有时 JAR-RS 客户端会发送错误的语法请求正文。服务器应响应 HTTP status 400 (Bad Request) , 但它以 HTTP status 500 (Internal Serve
我正在尝试通过 response.send() 发送一个整数,但我不断收到此错误 express deprecated res.send(status): Use res.sendStatus(sta
我已经用 Excel 和 Java 做过很多次了……这次我需要用 Stata 来做,因为保存变量更方便'labels .如何将 dataset_1 重组为下面的 dataset_2? 我需要转换以下
我正在创建一个应用程序,其中的对象具有状态查找功能。为了提供一些上下文,让我们使用以下示例。 帮助台应用程序,其中创建作业并通过以下工作流程移动: 新 - 工作已创建但未分配 进行中 - 分配给工作人
我想在 Keras 中运行 LSTM 并获得输出和状态。在 TF 中有这样的事情 with tf.variable_scope("RNN"): for time_step in range
有谁知道 Scala-GWT 的当前状态 项目? 那里的主要作者 Grzegorz Kossakowski 似乎退出了这个项目,在 Spring 中从事 scalac 的工作。 但是,在 interv
我正在尝试编写一个 super 简单的 applescript 来启动 OneDrive App , 或确保打开,当机器的电源设置为插入时,将退出,或确保关闭,当电源设置为电池时。 我无法找到如何访问
目前我正在做这样的事情 link.on('click', function () { if (link.attr('href') !== $route.current.originalPath
是否可以仅通过查看用户代理来检测浏览器上是否启用/禁用 Javascript。 如果是,我应该寻找什么。如果否,检测用户浏览器是否启用/禁用 JavaScript 的最佳方法是什么 最佳答案 不,没有
Spring 和 OSGi 目前的开发状况如何? 最近好像有点安静了。 文档的最新版本 ( http://docs.spring.io/osgi/ ) 来自 2009 年。 我看到一些声明 Sprin
我正在从主函数为此类创建一个线程,但即使使用 Thread.currentThread().interrupt() 中断它,输出仍然包含“Still Here”行。 public class Writ
为了满足并发要求,我想知道如何在 Godog 中的多个步骤之间传递参数或状态。 func FeatureContext(s *godog.Suite) { // This step is ca
我有一个UIButton子类,它不使用UIImage背景,仅使用背景色。我注意到的一件事是,当您设置按钮的背景图像时,有一个默认的突出显示状态,当按下按钮时,该按钮会稍微变暗。 这是我当前的代码。
我是一名优秀的程序员,十分优秀!