gpt4 book ai didi

java - 初始化MapState的内容

转载 作者:行者123 更新时间:2023-11-30 05:27:10 25 4
gpt4 key购买 nike

我已经实现了一个具有以下结构的 Flink RichFunction :

public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {

private MapState<String, MyState> myState;

@Override
public void open(Configuration conf)throws Exception{
myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));
}

@Override
public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
MyState state = myState.get(value.ID());

// Do things
}

@Override
public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
state.put(value.ID(), value.state()); // Update the mapState with value from broadcast
}

// retrieve all the state values and put them in the MapState
private void initialState() throws Exception{
Map<String, MyState> initialValues = ...;
this.cameras.putAll(initialValues);
}
}

mapState 变量存储通过 BroadcastedStream 更新的多个状态。更新是在 processBroadcastElement() 函数中完成的。

在作业开始时,我想使用 initialState() 函数初始化 mapState

问题是我无法在 open() 函数中使用它(请参阅 here 为什么)

在这种情况下初始化mapState的正确方法是什么? (并且在所有使用 RichFunctions 的情况下)

最佳答案

您想要实现 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction

当您这样做时,您实现了两种方法:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

// called when it's time to save state

myState.clear();

// Update myState with current application state

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

// called when things start up, possibly recovering from an error

descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));

myState = context.getKeyedStateStore().getMapState(descriptor);

if (context.isRestored()) {

// restore application state from myState

}

}

您可以在initializeState()方法而不是open()中初始化myState变量。

关于java - 初始化MapState的内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58307154/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com