gpt4 book ai didi

java - Apache 弗林克 : executing a program which extends the RichFlatMapFunction on the remote cluster causes error

转载 作者:行者123 更新时间:2023-11-30 03:08:11 24 4
gpt4 key购买 nike

我在 Apache Flink 中有以下代码。它在本地集群中运行良好,而在远程集群上运行它会在包含命令“stack.push(recordPair);”的行中生成 NullPointerException 错误。

有谁知道,这是什么原因?

本地和远程集群的输入数据集相同。

public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair ;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
private static Stack<Tuple2< Integer,Integer>> stack ;
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
stack = new Stack<Tuple2< Integer,Integer>>();
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}

最佳答案

问题在于您在 TC 类的构造函数中初始化了 stack 变量。这只会为客户端程序运行的 JVM 初始化静态变量。对于本地执行,这是可行的,因为 Flink 作业是在同一个 JVM 中执行的。

当您在集群上运行它时,您的TC将被序列化并传送到集群节点。实例的反序列化不会再次调用构造函数来初始化堆栈。为了实现此目的,您应该将初始化逻辑移至 RichFlatMapFunctionopen 方法或使用静态初始化器。但请注意,在同一 TaskManager 上运行的所有运算符将共享相同的 stack 实例,因为它是一个类变量。

public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
// either use a static initializer
private static Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
}

@Override
public void open(Configuration config) {
// or initialize stack here, but here you have to synchronize the initialization
...
}

@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
}

关于java - Apache 弗林克 : executing a program which extends the RichFlatMapFunction on the remote cluster causes error,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34224423/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com