- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
在 1.9 版本之前,Flink 运行时的状态对于用户来说是一个黑盒,我们是无法访问状态数据的,从 Flink-1.9 版本开始,官方提供了 State Processor API 这让用户读取和更新状态成为了可能,我们可以通过 State Processor API 很方便的查看任务的状态,还可以在任务第一次启动的时候基于历史数据做状态冷启动。从此状态对于用户来说是透明的。下面就来看一下 State Processor API 的使用。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>1.14.4</version>
</dependency>
State Processor API 将流应用程序的状态映射到一个或多个可以单独处理的数据集。为了能够使用 API,我们先来理解一下任务的状态和 DataSets 之间是如何映射的。
让我们先看看有状态的 Flink 作业是什么样子的。Flink 作业由多个算子组成,通常有一个或多个 source 数据源,一些用于实际处理数据的算子,以及一个或多个 sink 算子。每个算子并行的在一个或多个 task 上运行,并且可以处理不同类型的状态。一个算子可以有 0、1 个或多个 operator states,这些状态被组织成 list,作用于所有的 tasks 上。如果 operator 应用于 keyed states,它还可以有 0 个、1 个或多个 keyed state,这些状态的作用域为从每个 record 中提取的 key。
下图显示了应用程序 MyApp,它由 Src、Proc 和 Snk 三个算子组成。Src 有一个 operator state 状态(os1), Proc 有一个 operator 状态(os2) 和两个 keyed state 状态(ks1, ks2),而 Snk 是无状态的。
MyApp 的 SavePoint 或 CheckPoint 由所有的状态数据组成,以便可以恢复每个 task 的状态。在使用 batch 作业处理保存点(或检查点)的数据时,我们需要将各个任务状态的数据映射到数据集或表中的心智模型。实际上,我们可以将保存点视为数据库。每个 operator(由其UID标识)代表 namespace。每一个算子的 operator state 在 namespace 里都映射到一个固定的表里,其中有一列包含所有 task 的状态数据。一个算子的所有 keyed state 都映射到由 key 的列组成的单个表,以及另外一列对应每一个 keyed state。下图显示了MyApp 的保存点如何映射到数据库。
该图显示了 Src 的 operator state 的值是如何映射到一个表的,该表有一列和五行,每一行代表 Src 的所有并行任务中的每个列表条目。算子 Proc 的 operator state(os2) 类似地映射到单个表。keyed state ks1 和 ks2 合并到一个包含三列的表中,一列表示 key,一列用于 ks1,一列用于 ks2。这个 keyed table 为两个 keyed state 的每个不同 key 保存一行。因为算子 Snk 没有任何状态,所以它的 namespace 是空的。
读取状态首先需要指定一个有效的 savepoint 或 checkpoint 的路径,以及应该用于恢复数据的 StateBackend。恢复状态的兼容性保证与恢复 DataStream 应用程序时相同。
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new HashMapStateBackend());
这里让我想到了这个文章:【Flink】Flink 如何在本地IDEA恢复检查点 不通过IDEA 不通过 flink run 方法
读取状态时支持三种不同类型的状态:
Operator State
Keyed State
Window State
也可以编写 Savepoints,它允许这样的用例,如基于历史数据的启动状态。每个 Savepoints 由一个或多个 BootstrapTransformation(下面会解释)组成,每个 BootstrapTransformation 都定义了单个算子的状态。
注意:state processor api 当前未提供 Scala API。因此,它将始终使用 Java 类型堆栈自动推断出序列化器。要为 Scala Datastream API 启动 savepoint 请在所有类型信息中手动传递。
初始化状态时支持四种不同类型的状态:
Operator State
Broadcast State
Keyed State
Window State
除了从临时创建一个 savepoint 外,你还可以基于现有的 Savepoints,当为现有作业启动单个新的算子时。
Savepoint
.load(bEnv, oldPath, new HashMapStateBackend())
.withOperator("uid", transformation)
.write(newPath);
下面就来实现一下我们平时使用最多的 Keyed State 状态的读取和写入。
package flink.state;
import bean.Jason;
import bean.UserDefinedSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkStreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置任务的最大并行度 也就是keyGroup的个数
env.setMaxParallelism(128);
//env.getConfig().setAutoWatermarkInterval(1000L);
// 设置开启checkpoint
env.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///Users/jasonlee/flink-1.14.0/checkpoint");
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStreamSource<Jason> dataStreamSource = env.addSource(new UserDefinedSource());
dataStreamSource.keyBy(k -> k.getName())
.process(new KeyedProcessFunction<String, Jason, Jason>() {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(Jason value, KeyedProcessFunction<String, Jason, Jason>.Context ctx, Collector<Jason> out) throws Exception {
if (state.value() != null) {
System.out.println("状态里面有数据 :" + state.value());
value.setAge(state.value() + value.getAge());
state.update(state.value() + value.getAge());
} else {
state.update(value.getAge());
}
out.collect(value);
}
}).uid("my-uid")
.print("local-print");
env.execute();
}
}
代码非常简单,里面只用了一个 ValueState,来保存用户的 age ,key 是 name。要为带状态的算子设置唯一的 uid(“my-uid”),在读取状态的时候需要指定算子的 uid。
先把这个任务跑起来,然后只要任务 checkpoint 做成功就可以把任务停掉了。
在上面看到
// 设置任务的最大并行度 也就是keyGroup的个数
env.setMaxParallelism(128);
这点和知识点:【Flink】Flink key 应该分配到哪个 KeyGroup 以及 KeyGroup 分配在哪个subtask 串联在一起了。 我们的项目也有设置,开始我不知道什么原因。
然后来看一下生成的 ck 文件。
可以看到做了 10 次 ck,那这里我们就来读取 chk-10 这个 ck 里面的状态。
读取和写入状态的代码如下:
package flink.state;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class FlinkReadAndUpdateState {
private static final String ckPath = "file:///Users/jasonlee/flink-1.14.0/checkpoint/b02f75ede7e3b093eb3b58bdd5906de3/chk-10";
private static final Collection<KeyedState> data =
Arrays.asList(new KeyedState("hive", 1), new KeyedState("JasonLee1", 100), new KeyedState("hhase", 3));
public static void main(String[] args) throws Exception {
stateRead(ckPath);
//stateWrite("");
}
/**
* 从 ck 读取状态数据
* @param ckPath
* @throws Exception
*/
public static void stateRead(String ckPath) throws Exception {
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
bEnv.setParallelism(1);
ExistingSavepoint savepoint = Savepoint.load(bEnv, ckPath, new HashMapStateBackend());
DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
List<KeyedState> keyedStates = keyedState.collect();
for (KeyedState ks: keyedStates) {
System.out.println(String.format("key: %s, value: %s", ks.key, ks.value));
}
}
/**
* 初始化状态数据
* @param ckPath
*/
public static void stateWrite(String ckPath) throws Exception {
int maxParallelism = 128;
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
DataSet<KeyedState> dataKeyedState = bEnv.fromCollection(data);
BootstrapTransformation<KeyedState> transformation = OperatorTransformation
.bootstrapWith(dataKeyedState)
.keyBy(k -> k.key)
.transform(new WriterFunction());
Savepoint
.create(new HashMapStateBackend(), maxParallelism)
.withOperator("uid-test", transformation)
.write("file:///Users/jasonlee/flink-1.14.0/checkpoint/init_state");
bEnv.execute();
}
public static class WriterFunction extends KeyedStateBootstrapFunction<String, KeyedState> {
ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(KeyedState value, KeyedStateBootstrapFunction<String, KeyedState>.Context ctx) throws Exception {
state.update(value.value);
}
}
public static class ReaderFunction extends KeyedStateReaderFunction<String, KeyedState> {
ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void readKey(
String key,
Context ctx,
Collector<KeyedState> out) throws Exception {
KeyedState data = new KeyedState();
data.key = key;
data.value = state.value();
out.collect(data);
}
}
public static class KeyedState {
public String key;
public int value;
public KeyedState(String key, int value) {
this.key = key;
this.value = value;
}
public KeyedState() {}
}
}
这里读取和写入状态的代码放到一起了,只需调用 savepoint 的 readKeyedState 方法指定一下上面代码里面设置的 uid,还需要继承 KeyedStateReaderFunction 实现 readKey 方法就可以了。代码比较简单,这里就不在多说。直接来看一下读取的结果。
执行这个代码,打印的状态数据如下:
key: JasonLee35, value: 35
key: JasonLee66, value: 66
key: JasonLee81, value: 81
key: JasonLee74, value: 74
key: JasonLee90, value: 90
key: JasonLee36, value: 36
key: JasonLee85, value: 85
key: JasonLee39, value: 39
key: JasonLee72, value: 72
key: JasonLee65, value: 65
key: JasonLee58, value: 58
key: JasonLee9, value: 9
key: JasonLee69, value: 69
key: JasonLee82, value: 82
key: JasonLee53, value: 53
key: JasonLee6, value: 6
key: JasonLee79, value: 79
key: JasonLee32, value: 32
key: JasonLee64, value: 64
key: JasonLee76, value: 76
key: JasonLee91, value: 91
key: JasonLee18, value: 18
key: JasonLee26, value: 26
key: JasonLee40, value: 40
key: JasonLee25, value: 25
key: JasonLee54, value: 54
key: JasonLee21, value: 21
key: JasonLee55, value: 55
key: JasonLee78, value: 78
key: JasonLee71, value: 71
key: JasonLee42, value: 42
key: JasonLee56, value: 56
key: JasonLee17, value: 17
key: JasonLee88, value: 88
key: JasonLee61, value: 61
key: JasonLee27, value: 27
key: JasonLee41, value: 41
key: JasonLee12, value: 12
key: JasonLee63, value: 63
key: JasonLee5, value: 5
key: JasonLee73, value: 73
key: JasonLee67, value: 67
key: JasonLee29, value: 29
key: JasonLee31, value: 31
key: JasonLee14, value: 14
key: JasonLee92, value: 92
key: JasonLee7, value: 7
key: JasonLee45, value: 45
key: JasonLee48, value: 48
key: JasonLee11, value: 11
key: JasonLee75, value: 75
key: JasonLee84, value: 84
key: JasonLee13, value: 13
key: JasonLee77, value: 77
key: JasonLee59, value: 59
key: JasonLee83, value: 83
key: JasonLee15, value: 15
key: JasonLee37, value: 37
key: JasonLee52, value: 52
key: JasonLee30, value: 30
key: JasonLee62, value: 62
key: JasonLee34, value: 34
key: JasonLee19, value: 19
key: JasonLee87, value: 87
key: JasonLee86, value: 86
key: JasonLee38, value: 38
key: JasonLee57, value: 57
key: JasonLee10, value: 10
key: JasonLee49, value: 49
key: JasonLee46, value: 46
key: JasonLee8, value: 8
key: JasonLee28, value: 28
key: JasonLee2, value: 2
key: JasonLee89, value: 89
key: JasonLee16, value: 16
key: JasonLee24, value: 24
key: JasonLee50, value: 50
key: JasonLee3, value: 3
key: JasonLee51, value: 51
key: JasonLee44, value: 44
key: JasonLee47, value: 47
key: JasonLee33, value: 33
key: JasonLee68, value: 68
key: JasonLee22, value: 22
key: JasonLee80, value: 80
key: JasonLee20, value: 20
key: JasonLee23, value: 23
key: JasonLee1, value: 1
key: JasonLee70, value: 70
key: JasonLee60, value: 60
key: JasonLee4, value: 4
key: JasonLee43, value: 43
可以看到这个就是我们写入的状态数据。
然后再来测试一下初始化状态数据,跟读取状态刚好相反,我们需要先写入一个状态到指定的路径。然后在指定这个状态路径启动任务。
运行上面写入的代码,会在 /Users/jasonlee/flink-1.14.0/checkpoint/init_state 路径下面生成一个 _metadata 文件。来看一下生成的文件。
这里我读取状态和写入状态用的是同一个算子,也就是上面的 KeyedProcessFunction 算子,注意在恢复状态的时候需要把算子的 uid 改成和 .withOperator(“uid-test”, transformation) 参数保持一致。
然后就可以 通过下面的命令指定 ck 启动任务。
flink run -d -m yarn-cluster \
-Dyarn.application.name=FlinkStreamingNewDemoHome \
-Dyarn.application.queue=flink \
-Dmetrics.reporter.promgateway.groupingKey="jobname=FlinkStreamingNewDemoHome" \
-Dmetrics.reporter.promgateway.jobName=FlinkStreamingNewDemoHome \
-c flink.state.FlinkStreamingDemo \
-Denv.java.opts="-Dflink_job_name=FlinkStreamingNewDemoHome" \
-s hdfs:///flink-rockdb/checkpoints/init_state/_metadata \
/home/jason/bigdata/jar/flink-1.14.x-1.0-SNAPSHOT.jar
从上图可以看出任务确实是从我们指定的 ck 恢复的,这里其实和指定 checkpoint 或 savepoint 恢复任务是一样的,可以再来看一下 TM 里我们在代码里面打印的日志。
因为我们初始化了 JasonLee1 100 所以从状态里面读取出来的是 100 然后第一条数据的 age 是 1 所以打印的 JasonLee1 101 是没问题的。整个读取和写入状态的流程就结束了,其他类型的状态这里就不在演示,用法基本都是一样的。实际使用的时候根据场景选择不同类型的状态就可以了。
转载:Flink 通过 State Processor API 实现状态的读取和写入
\Processor(_Total)\% Processor Time 是什么意思。基本假设是什么? 描述:因为这是来自 WAD 性能表计数器的性能计数器;因为这是 Azure,是否有任何测量相同的假
我有一个执行有限差分计算的 CUDA 代码。该代码在 Tesla M2090 处理器上运行良好,没有错误。相同的代码会在 Tesla T10 处理器中导致大量错误。我的结果中有很多零。 有谁知道这两种
我有一个执行有限差分计算的 CUDA 代码。该代码在 Tesla M2090 处理器上运行良好,没有错误。相同的代码会在 Tesla T10 处理器中导致大量错误。我的结果中有很多零。 有谁知道这两种
我正在尝试让 QueryDSL 用于 Spring Roo 项目。 这是我的插件配置: com.mysema.maven maven-
关注 this question ,我正在尝试从 this tutorial 组装示例代码 #include "p10f200.inc" ; CONFIG __CONFIG _WDT_OFF
我是qemu的新手,我读到它允许单步模式仿真。这很有用,因为我试图在每个周期中转储物理内存的某些地址。不幸的是,qemu文档非常糟糕。我知道如何从qemu监视器启用单步模式,但是我不知道将要在每个步骤
我的问题来自 Mystical's answer .据我了解,您有一条分支指令,它可以转到另一条指令,例如 0x123344或者它可以继续执行。 如果分支预测器根据过去的模式从其中任何一个进行猜测,它
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
是否由地址总线的大小确定?如果是,那么8086是20位处理器吗?如果没有,为处理器分配8位,16位,32位之类的位的标准是什么? 最佳答案 它的定义不明确。正如xtofl所指出的,广义上讲,它是原子计
我一直在寻找用于撰写技术论文的文字处理器,但我还没有真正找到。拥有一个可以很好地处理数学表达式、代码和伪代码的编辑器真的很不错。我还没有找到一种效果很好的。 有人有什么建议吗? 最佳答案 我个人相信
这个问题看起来太简单了,但我是在看了几个 ppt 后才问的。 这两种方法都提高了指令吞吐量。 super 扩展几乎也总是利用管道。 super 缩放有多个执行单元,管道也是如此,还是我错了? 最佳答案
我目前正在 LogiSim 中开发 6502 的一个子集,在当前阶段我正在确定要实现哪些部分以及可以删除哪些部分。我的主要资源之一是Hanson's Block Diagram . 我目前正在尝试确定
我目前正在 LogiSim 中开发 6502 的子集。我的主要资源之一是Hanson's Block Diagram . 我正在尝试确定应该如何以及在哪里构建电路来更新处理器状态寄存器。在下面的处理器
我对 Apache-airflow 非常陌生,刚开始在 udemy (this course) 中学习类(class)。 我们已收到 YAML 文件,并被要求按照说明安装 Airflow 。我相信我已
对于大学中期项目,我必须设计一个可配置的处理器,用 VHDL 编写代码,然后在 Digilent 的 Spartan 3E FPGA 板上进行综合。我是一个初学者,所以你能指点我一些关于可配置处理器的
我正在尝试获取处理器信息,特别是像 这样的名称Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz . 搜索网络我发现了一个函数,它使用 for-each 为我提供了处理器
我们的项目中有两个注释,我想收集带注释的类并基于两个类列表创建合并的输出。 只用一个 Processor 可以吗?实例?我怎么知道 Processor每个带注释的类都调用了实例? 最佳答案 框架调用
我有以下插入语句: 插入 temp1 值 (test1, test2) 插入 temp2 值 (test3) 预期结果: 插入 temp1 值 (100, 200) 插入 temp2 值 (300)
C99 标准第 7.23.1 节第 1 段定义了几个“时间”术语: Many functions deal with a calendar time that represents the curre
我正在编写注释处理器,我需要 TreeTranslator.visitIdent 来放置静态方法调用。我想我应该使用 TreeMaker.Call 或 TreeMaker.Create,还是应该使用
我是一名优秀的程序员,十分优秀!