Apache Ignite:与数据流相关的序列化错误
Apache Ignite: Serialization error related to Data Streaming
我正在尝试研究 Apache Ignite 流式处理的工作原理。我有 2 个节点集群设置(都在本地主机上),并且我启动了一个客户端节点,该节点使用 StreamTransformer 和 EntryProcessor 运行s 流代码。结果在我的一个节点中我得到无法反序列化异常。我的代码是来自 Ignite 文档的简化 WordCount 示例:
public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
@Override
public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
Long val = e.getValue();
e.setValue(val == null ? 1L : val + 1);
return null;
}
}
public static void main(String[] args) throws IgniteException, IOException {
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
stmr.allowOverwrite(true);
stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
stmr.addData("word", 1L);
System.out.println("Finished");
}
}
}
}
异常我得到一个两个节点之一是
[23:38:23] Topology snapshot [ver=5, servers=2, clients=1, CPUs=4, heap=3.3GB]
Exception in thread "pub-#9%null%" class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller
at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595)
at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663)
at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298)
at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access[=11=]0(DataStreamProcessor.java:50)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.onMessage(DataStreamProcessor.java:80)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866)
at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:106)
at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:829)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find class with given class loader for unmarshalling (make sure same versions of all classes are avai
lable on all nodes or enable peer-class-loading): sun.misc.Launcher$AppClassLoader@4e857327
at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224)
at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592)
... 13 more
Caused by: java.lang.ClassNotFoundException: gridgaingames.StreamingExample$StreamingExampleCacheEntryProcessor
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350)
at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185)
at org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266)
at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491)
at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579)
at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841)
at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218)
... 14 more
有几件事我无法得到。
1) 我该如何解决?
2) 因为这不是 "broadcast" 之类的东西,所以我认为仅在调用节点上点燃 运行s 流代码。看来我错了。那么我的 Streaming 代码在哪里执行?
3) 打印 "Finished" 行后我的代码没有停止。为什么?看起来一些非守护线程仍然存在。这是阻止我的客户端节点退出的流代码吗?
PS
对等类加载已启用。如果我 运行 一些在许多节点上执行代码的广播示例 - 它工作正常。
基本上 IgniteDataStreamer
在发送方(在您的示例中为客户端)准备数据批次,并立即将它们发送到应该存储特定键值元组的目标节点。请牢记这一点,您的问题的答案如下:
- 在将条目放入缓存之前,转换器在目标节点(服务器节点)上执行。这意味着服务器节点必须在其 class 路径中包含转换器的 class,或者,您必须启用 peer-class-loading。个人认为后者更灵活,更可取。
- 正如上面所解释的,发件人只需准备发送到部署了缓存的所有服务器的批次。服务器仅接收那些包含元组的批次,其中服务器是主元组或备份元组。
- 批量刷新在后台进行,因为
IgniteDataStreamer
用于快速数据预加载或复杂的流处理 (CEP)。有许多参数可让您调整刷新 - autoFlustFrequency
、perNodeBufferSize
.
最后,对于预加载需求(当缓存为空并且您需要填充它们时)我建议将 allowOverwrite
设置为 false
这将允许流媒体准备和发送批次主备节点分开。如果此参数设置为 true
,则批次仅在主节点上发送,主节点在更新其数据版本和相应的备份后使用基本 cache.put
操作注入数据。如果您只需要预加载缓存,这种方法会比较慢。
我正在尝试研究 Apache Ignite 流式处理的工作原理。我有 2 个节点集群设置(都在本地主机上),并且我启动了一个客户端节点,该节点使用 StreamTransformer 和 EntryProcessor 运行s 流代码。结果在我的一个节点中我得到无法反序列化异常。我的代码是来自 Ignite 文档的简化 WordCount 示例:
public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
@Override
public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
Long val = e.getValue();
e.setValue(val == null ? 1L : val + 1);
return null;
}
}
public static void main(String[] args) throws IgniteException, IOException {
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
stmr.allowOverwrite(true);
stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
stmr.addData("word", 1L);
System.out.println("Finished");
}
}
}
}
异常我得到一个两个节点之一是
[23:38:23] Topology snapshot [ver=5, servers=2, clients=1, CPUs=4, heap=3.3GB] Exception in thread "pub-#9%null%" class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595) at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663) at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298) at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access[=11=]0(DataStreamProcessor.java:50) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.onMessage(DataStreamProcessor.java:80) at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:106) at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:829) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find class with given class loader for unmarshalling (make sure same versions of all classes are avai lable on all nodes or enable peer-class-loading): sun.misc.Launcher$AppClassLoader@4e857327 at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224) at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592) ... 13 more Caused by: java.lang.ClassNotFoundException: gridgaingames.StreamingExample$StreamingExampleCacheEntryProcessor at java.net.URLClassLoader.run(URLClassLoader.java:366) at java.net.URLClassLoader.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350) at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185) at org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579) at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218) ... 14 more
有几件事我无法得到。
1) 我该如何解决?
2) 因为这不是 "broadcast" 之类的东西,所以我认为仅在调用节点上点燃 运行s 流代码。看来我错了。那么我的 Streaming 代码在哪里执行?
3) 打印 "Finished" 行后我的代码没有停止。为什么?看起来一些非守护线程仍然存在。这是阻止我的客户端节点退出的流代码吗?
PS
对等类加载已启用。如果我 运行 一些在许多节点上执行代码的广播示例 - 它工作正常。
基本上 IgniteDataStreamer
在发送方(在您的示例中为客户端)准备数据批次,并立即将它们发送到应该存储特定键值元组的目标节点。请牢记这一点,您的问题的答案如下:
- 在将条目放入缓存之前,转换器在目标节点(服务器节点)上执行。这意味着服务器节点必须在其 class 路径中包含转换器的 class,或者,您必须启用 peer-class-loading。个人认为后者更灵活,更可取。
- 正如上面所解释的,发件人只需准备发送到部署了缓存的所有服务器的批次。服务器仅接收那些包含元组的批次,其中服务器是主元组或备份元组。
- 批量刷新在后台进行,因为
IgniteDataStreamer
用于快速数据预加载或复杂的流处理 (CEP)。有许多参数可让您调整刷新 -autoFlustFrequency
、perNodeBufferSize
.
最后,对于预加载需求(当缓存为空并且您需要填充它们时)我建议将 allowOverwrite
设置为 false
这将允许流媒体准备和发送批次主备节点分开。如果此参数设置为 true
,则批次仅在主节点上发送,主节点在更新其数据版本和相应的备份后使用基本 cache.put
操作注入数据。如果您只需要预加载缓存,这种方法会比较慢。