gpt4 book ai didi

java - 如何在 Java 中序列化 ExecutorService?

转载 作者:行者123 更新时间:2023-12-01 19:42:21 61 4
gpt4 key购买 nike

我创建了一个 CountMinSketch 来计算某些值的最小频率。我正在使用 ExecutorService 异步更新草图。我在我的 Flink 项目中使用这个类,它需要可序列化,所以我正在实现 Serialized 接口(interface)。然而,这还不够,因为 ExecutorService 还需要可序列化。如何以可序列化的方式使用ExecutorService?或者是否有任何可序列化的 ExecutorService 实现?

import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CountMinSketch implements Serializable {

private static final long serialVersionUID = 1123747953291780413L;

private static final int H1 = 0;
private static final int H2 = 1;
private static final int H3 = 2;
private static final int H4 = 3;
private static final int LIMIT = 100;
private final int[][] sketch = new int[4][LIMIT];

final NaiveHashFunction h1 = new NaiveHashFunction(11, 9);
final NaiveHashFunction h2 = new NaiveHashFunction(17, 15);
final NaiveHashFunction h3 = new NaiveHashFunction(31, 65);
final NaiveHashFunction h4 = new NaiveHashFunction(61, 101);

private ExecutorService executor = Executors.newSingleThreadExecutor();

public CountMinSketch() {
// initialize sketch
}

public Future<Boolean> updateSketch(String value) {
return executor.submit(() -> {
sketch[H1][h1.getHashValue(value)]++;
sketch[H2][h2.getHashValue(value)]++;
sketch[H3][h3.getHashValue(value)]++;
sketch[H4][h4.getHashValue(value)]++;
return true;
});
}

public Future<Boolean> updateSketch(String value, int count) {
return executor.submit(() -> {
sketch[H1][h1.getHashValue(value)] = sketch[H1][h1.getHashValue(value)] + count;
sketch[H2][h2.getHashValue(value)] = sketch[H2][h2.getHashValue(value)] + count;
sketch[H3][h3.getHashValue(value)] = sketch[H3][h3.getHashValue(value)] + count;
sketch[H4][h4.getHashValue(value)] = sketch[H4][h4.getHashValue(value)] + count;
return true;
});
}

public int getFrequencyFromSketch(String value) {
int valueH1 = sketch[H1][h1.getHashValue(value)];
int valueH2 = sketch[H2][h2.getHashValue(value)];
int valueH3 = sketch[H3][h3.getHashValue(value)];
int valueH4 = sketch[H4][h4.getHashValue(value)];
return findMinimum(valueH1, valueH2, valueH3, valueH4);
}

private int findMinimum(final int a, final int b, final int c, final int d) {
return Math.min(Math.min(a, b), Math.min(c, d));
}
}

import java.io.Serializable;

public class NaiveHashFunction implements Serializable {

private static final long serialVersionUID = -3460094846654202562L;
private final static int LIMIT = 100;
private long prime;
private long odd;

public NaiveHashFunction(final long prime, final long odd) {
this.prime = prime;
this.odd = odd;
}

public int getHashValue(final String value) {
int hash = value.hashCode();
if (hash < 0) {
hash = Math.abs(hash);
}
return calculateHash(hash, prime, odd);
}

private int calculateHash(final int hash, final long prime, final long odd) {
return (int) ((((hash % LIMIT) * prime) % LIMIT) * odd) % LIMIT;
}
}

Flink 类:

    public static class AverageAggregator implements
AggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {

private static final long serialVersionUID = 7233937097358437044L;
private String functionName;
private CountMinSketch countMinSketch = new CountMinSketch();
.....
}

错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the AggregateFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:811)
at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:730)
at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:701)
at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.<init>(MultiSensorMultiStationsReadingMqtt2.java:39)
at org.sense.flink.App.main(App.java:141)
Caused by: java.io.NotSerializableException: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 6 more

最佳答案

ExecutorService 包含无法序列化的状态。特别是工作线程......以及它们正在处理的任务的状态永远无法使用标准对象序列化类进行序列化。

如果您确实不需要序列化ExecutorService,您可以将引用它的变量标记为transient ...以阻止它被意外序列化.

可以想象,您可以序列化 ExecutorService 的工作队列。但是序列化正在执行的任务需要您实现自定义机制来在任务运行时检查任务的 Callable/Runnable ...。

<小时/>

如果您尝试将序列化本身作为计算检查点的机制,那么您可能找错了树。序列化无法捕获线程堆栈上保存的状态。

关于java - 如何在 Java 中序列化 ExecutorService?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54905377/

61 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com