Apache Ignite:与数据流相关的序列化错误

Apache Ignite: Serialization error related to Data Streaming

我正在尝试研究 Apache Ignite 流式处理的工作原理。我有 2 个节点集群设置(都在本地主机上),并且我启动了一个客户端节点,该节点使用 StreamTransformer 和 EntryProcessor 运行s 流代码。结果在我的一个节点中我得到无法反序列化异常。我的代码是来自 Ignite 文档的简化 WordCount 示例:

public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
    @Override
    public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
        Long val = e.getValue();
        e.setValue(val == null ? 1L : val + 1);
        return null;
    }
}

public static void main(String[] args) throws IgniteException, IOException {
    Ignition.setClientMode(true);
    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
        IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
        try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
            stmr.allowOverwrite(true);
            stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
            stmr.addData("word", 1L);
            System.out.println("Finished");
        }
    }
}

}

异常我得到一个两个节点之一是

[23:38:23] Topology snapshot [ver=5, servers=2, clients=1, CPUs=4, heap=3.3GB] Exception in thread "pub-#9%null%" class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595) at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663) at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298) at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access[=11=]0(DataStreamProcessor.java:50) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.onMessage(DataStreamProcessor.java:80) at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:106) at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:829) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find class with given class loader for unmarshalling (make sure same versions of all classes are avai lable on all nodes or enable peer-class-loading): sun.misc.Launcher$AppClassLoader@4e857327 at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224) at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592) ... 13 more Caused by: java.lang.ClassNotFoundException: gridgaingames.StreamingExample$StreamingExampleCacheEntryProcessor at java.net.URLClassLoader.run(URLClassLoader.java:366) at java.net.URLClassLoader.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350) at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185) at org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579) at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218) ... 14 more

有几件事我无法得到。

1) 我该如何解决?

2) 因为这不是 "broadcast" 之类的东西,所以我认为仅在调用节点上点燃 运行s 流代码。看来我错了。那么我的 Streaming 代码在哪里执行?

3) 打印 "Finished" 行后我的代码没有停止。为什么?看起来一些非守护线程仍然存在。这是阻止我的客户端节点退出的流代码吗?

PS

对等类加载已启用。如果我 运行 一些在许多节点上执行代码的广播示例 - 它工作正常。

基本上 IgniteDataStreamer 在发送方(在您的示例中为客户端)准备数据批次,并立即将它们发送到应该存储特定键值元组的目标节点。请牢记这一点,您的问题的答案如下:

  1. 在将条目放入缓存之前,转换器在目标节点(服务器节点)上执行。这意味着服务器节点必须在其 class 路径中包含转换器的 class,或者,您必须启用 peer-class-loading。个人认为后者更灵活,更可取。
  2. 正如上面所解释的,发件人只需准备发送到部署了缓存的所有服务器的批次。服务器仅接收那些包含元组的批次,其中服务器是主元组或备份元组。
  3. 批量刷新在后台进行,因为 IgniteDataStreamer 用于快速数据预加载或复杂的流处理 (CEP)。有许多参数可让您调整刷新 - autoFlustFrequencyperNodeBufferSize.

最后,对于预加载需求(当缓存为空并且您需要填充它们时)我建议将 allowOverwrite 设置为 false 这将允许流媒体准备和发送批次主备节点分开。如果此参数设置为 true,则批次仅在主节点上发送,主节点在更新其数据版本和相应的备份后使用基本 cache.put 操作注入数据。如果您只需要预加载缓存,这种方法会比较慢。