gpt4 book ai didi

java - Flink RichMapFunction 中未调用 open 方法

转载 作者:太空宇宙 更新时间:2023-11-04 11:31:14 24 4
gpt4 key购买 nike

我正在尝试使用 apache flink 作为 Shortcuts 中描述的简单示例。 。但是,我注意到 open 方法从未被调用,因此我在 map 函数的第一行得到空指针异常。

public class MyMap extends RichMapFunction<Integer, Integer> {

private ValueState<Integer> test;

public void open(Configuration cfg) {
test = getRuntimeContext().getState(new
ValueStateDescriptor<Integer>("myTest", Integer.class));
System.out.println("1:" + test);
}


@Override
public Integer map(Integer i) throws Exception {
System.out.println("2:" + test.value()); //test is null here
test.update(test.value() == null? 1: test.value() + 1);
System.out.println("3:" + test.value());
return i;
}
}

最佳答案

更新:

您是否尝试@Override open 函数?

test test.value 第一次应该为 null 。您处于键控上下文中,这意味着每条消息都有一个 flink 已经知道的键。当您输入有状态运算符时,flink 将尝试从配置的状态后端获取该键的值。除非您将 ValueStateDescriptor 配置为具有默认值(已弃用),否则第一次处理特定键的消息时,状态将为 null。因此,您的应用程序应该处理 null 值。

尝试以下示例(我的 java 很生锈,这是在 scala 中。如果您需要转换它的帮助,请询问我):

env.fromElements(("key1", 2),("key2", 4), ("key1", 5))
.keyBy(_._1)
.map {
new RichMapFunction[(String, Int), (String, Int)] {

lazy val stateTypeInfo: TypeInformation[Int] = implicitly[TypeInformation[Int]]
lazy val serializer: TypeSerializer[Int] = stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
lazy val stateDescriptor = new ValueStateDescriptor[Int]("dummy state", serializer)

var testVar: ValueState[Int] = _

override def open(config: Configuration) = {
testVar = this.getRuntimeContext.getState(stateDescriptor)
}

override def map(in: (String, Int)): (String, Int) = {
println(s"message $in")
println(s"state ${testVar.value()}")
println()
val sum = Option(testVar.value()).getOrElse(0) + in._2
testVar.update(sum)
(in._1, sum)
}
}
}.print()

env.execute()

这应该产生:

message (key1,2) (first time key1 is seen)
state null (state is null)

(key1,2) (output)
message (key2,4) (first time key2 is seen)
state null (state is null)

(key2,4) (output)
message (key1,5) (second time key1 is seen!! We stored something there!)
state 2 (we stored a 2)

(key1,7) (thus output is 2+5=7)

关于java - Flink RichMapFunction 中未调用 open 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43767329/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com