gpt4 book ai didi

java - MapFunction 的实现不可序列化 Flink

转载 作者:行者123 更新时间:2023-12-01 17:30:37 26 4
gpt4 key购买 nike

我正在尝试实现一个类,该类使用户能够操作 N 个输入流,而不受输入流类型的限制。

首先,我想将所有输入数据流转换为 keyedStreams。因此,我将输入数据流映射到一个元组中,然后应用 KeyBy 将其转换为 key 流。

我总是遇到序列化问题,我尝试遵循本指南 https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html但它不起作用。

我想知道的是:

  1. 什么是 Java 中的序列化/反序列化?以及用途。
  2. 在 Flink 中通过序列化可以解决哪些问题
  3. 我的代码有什么问题(您可以在代码和错误消息下面找到)

非常感谢。

主类:

public class CEP {

private Integer streamsIdComp = 0;
final private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();

KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}

public <T1> void addInputStream(DataStream<T1> inputStream) {

TypeInformation<T1> streamType = inputStream.getType();

dataStreamsTypes.put(streamsIdComp, streamType);
dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
streamsIdComp++;
}
}

测试类

public class CEPTest {

@Test
public void addInputStreamTest() throws Exception {
//test if we can change keys in a keyedStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Record> input1 = env.fromElements(
new Record("1", 1, "a"),
new Record("2", 2, "b"),
new Record("3", 3, "c"))
.keyBy(Record::getBizName);

DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);

CEP cepObject = new CEP();
cepObject.addInputStream(input1);
cepObject.addInputStream(input2);

}
}

错误消息

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a
common reason for non-serializability. A common solution is to make the function a proper
(non-inner) class, or a static inner class.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more

最佳答案

Flink 是一个分布式框架。这意味着,您的程序可能会在数千个节点上运行。这也意味着每个工作节点必须接收要执行的代码以及所需的上下文。稍微简化一下,流经系统的事件和要执行的函数都必须是可序列化的 - 因为它们是通过线路传输的。这就是为什么序列化在分布式编程中很重要。

<小时/>

简而言之,序列化是将数据编码为字节表示的过程,可以在另一个节点(另一个 JVM)上传输和恢复。

<小时/>

回到问题。这是你的原因:

Caused by: java.io.NotSerializableException: CEP

这是由线路引起的

return Tuple2.of(streamsIdComp, value);

您正在使用streamsIdComp变量,它是CEP类中的一个字段。这意味着,Flink 必须序列化整个类才能在执行 MapFunction 时访问该字段。您可以通过在 converttoKeyedStream 函数中引入局部变量来克服它:

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
// note this variable is local
int localStreamsIdComp = streamsIdComp;

KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
// and is used here
return Tuple2.of(localStreamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}

这样 Flink 就必须序列化这个单个变量,而不是整个类本身。

关于java - MapFunction 的实现不可序列化 Flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61128734/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com