gpt4 book ai didi

java - Apache 点燃: Serialization error related to Data Streaming

转载 作者:行者123 更新时间:2023-12-01 21:18:28 27 4
gpt4 key购买 nike

我正在尝试研究 Apache Ignite 流式传输的工作原理。我有 2 个节点集群设置(都在本地主机上),并且启动一个使用 StreamTransformer 和 EntryProcessor 运行流代码的客户端节点。结果,我的一个节点出现了无法反序列化异常。我的代码是来自 Ignite 文档的简化 WordCount 示例:

public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
@Override
public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
Long val = e.getValue();
e.setValue(val == null ? 1L : val + 1);
return null;
}
}

public static void main(String[] args) throws IgniteException, IOException {
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
stmr.allowOverwrite(true);
stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
stmr.addData("word", 1L);
System.out.println("Finished");
}
}
}

}

异常我得到两个节点之一是

[23:38:23] Topology snapshot [ver=5, servers=2, clients=1, CPUs=4, heap=3.3GB] Exception in thread "pub-#9%null%" class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595) at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663) at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298) at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:50) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:80) at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) at org.apache.ignite.internal.managers.communication.GridIoManager.access$1700(GridIoManager.java:106) at org.apache.ignite.internal.managers.communication.GridIoManager$5.run(GridIoManager.java:829) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find class with given class loader for unmarshalling (make sure same versions of all classes are avai lable on all nodes or enable peer-class-loading): sun.misc.Launcher$AppClassLoader@4e857327 at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224) at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592) ... 13 more Caused by: java.lang.ClassNotFoundException: gridgaingames.StreamingExample$StreamingExampleCacheEntryProcessor at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350) at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185) at org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579) at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218) ... 14 more

有几件事我无法得到。

1)我该如何修复它?

2)由于这不是“广播”或其他东西,我认为 Ignite 仅在调用节点上运行流代码。看来我错了。那么我的 Streaming 代码在哪里执行?

3)打印“完成”行后,我的代码不会停止。为什么?看起来一些非守护线程仍然存在。这是阻止我的客户端节点退出的流代码吗?

附注

启用对等类加载。如果我使用广播运行一些在许多节点上执行代码的示例 - 它可以正常工作。

最佳答案

基本上,IgniteDataStreamer 在发送方(示例中的客户端)准备数据批处理,并将它们立即发送到应存储特定键值元组的目标节点。记住这一点,您的问题的答案如下:

  1. 在将条目放入缓存之前,转换器会在目标节点(服务器节点)上执行。这意味着服务器节点的类路径中必须有转换器的类,或者,您必须启用对等类加载。个人认为后者是更灵活、更可取的方案。
  2. 正如上面所解释的,发送者只需准备发送到部署缓存的所有服务器的批处理。服务器仅接收包含服务器作为主元组或备份元组的那些批处理。
  3. 批处理的刷新发生在后台,因为 IgniteDataStreamer 用于快速数据预加载或复杂的流处理 (CEP)。有许多参数可让您调整刷新 - autoFlustFrequencyperNodeBufferSize

最后,对于预加载需求(当缓存为空并且需要填充它们时),我建议将 allowOverwrite 设置为 false 这将允许流媒体准备并分别为主节点和备份节点发送批处理。如果此参数设置为 true,则仅在主节点上发送批处理,并且主节点在更新其数据版本和相应的版本后,使用基本的cache.put 操作注入(inject)数据。备份。如果您只需要预加载缓存,则此方法会比较慢。

关于java - Apache 点燃: Serialization error related to Data Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39539835/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com