Hazelcast 为聚合的基于谓词的供应商抛出 java.io.NotSerializableException

Hazelcast throws java.io.NotSerializableException for Predicate based supplier for Aggregate

下面是我尝试使用 Hazelcast 客户端 运行 在 hazelcast 上聚合的代码。第一个聚合结果很好,但是第二个聚合抛出一个 java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment

我的代码:

public class AggregateExperiment {
    public void runTest(){
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.addAddress("127.0.0.1:5701");
        clientConfig.setClassLoader(this.getClass().getClassLoader());
        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        IMap<String, Integer> map = client.getMap("customers");
        Supplier<String, Integer, Integer> supplier = Supplier.all();
// Choose the average aggregation
        Aggregation<String, Integer, Integer> aggregation = Aggregations.integerAvg();
        int average  =  map.aggregate(supplier, aggregation);
        System.out.println("Average of inputs = "+average);

        supplier = Supplier.fromKeyPredicate(new MyKeyPredicate());
// Choose the sum aggregation
        aggregation = Aggregations.integerSum();
        average  =  map.aggregate(supplier, aggregation );
        System.out.println("Average of inputs = "+average);

    }

    public class MyKeyPredicate implements KeyPredicate<String> {
        public boolean evaluate(String key) {
            return Integer.parseInt(key) % 4 == 0;
        }
    }
}

错误信息:

Average of inputs = 1
[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.hazelcast.core.HazelcastException: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:988)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
    at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
    at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:188)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.access[=12=]0(ClientInvocationFuture.java:41)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.run(ClientInvocationFuture.java:234)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
    at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at com.hazelcast.nio.serialization.SerializationServiceImpl.handleException(SerializationServiceImpl.java:380)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:307)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.KeyPredicateSupplier.writeData(KeyPredicateSupplier.java:79)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.writeData(SupplierConsumingMapper.java:75)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.writeData(ClientMapReduceRequest.java:204)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.write(ClientMapReduceRequest.java:173)
    at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86)
    at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207)
    at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104)
    at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnRandomTarget(ClientSmartInvocationServiceImpl.java:60)
    at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:163)
    at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:147)
    at com.hazelcast.client.proxy.ClientMapReduceProxy$ClientJob.invoke(ClientMapReduceProxy.java:124)
    at com.hazelcast.mapreduce.impl.AbstractJob.submit(AbstractJob.java:119)
    at com.hazelcast.mapreduce.impl.AbstractJob$ReducingSubmittableJobImpl.submit(AbstractJob.java:348)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:985)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
    at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
    at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
    at ------ End remote and begin local stack-trace ------.(Unknown Source)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175)
    ... 8 more
Caused by: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at com.hazelcast.nio.serialization.DefaultSerializers$ObjectSerializer.write(DefaultSerializers.java:223)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.KeyPredicateSupplier.writeData(KeyPredicateSupplier.java:79)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.writeData(SupplierConsumingMapper.java:75)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.writeData(ClientMapReduceRequest.java:204)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.write(ClientMapReduceRequest.java:173)
    at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86)
    at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207)
    at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104)
    at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnRandomTarget(ClientSmartInvocationServiceImpl.java:60)
    at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:163)
    at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:147)
    at com.hazelcast.client.proxy.ClientMapReduceProxy$ClientJob.invoke(ClientMapReduceProxy.java:124)
    at com.hazelcast.mapreduce.impl.AbstractJob.submit(AbstractJob.java:119)
    at com.hazelcast.mapreduce.impl.AbstractJob$ReducingSubmittableJobImpl.submit(AbstractJob.java:348)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:985)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
    at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
    at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)

非常感谢任何帮助。谢谢

内部 class MyKeyPredicate 的实例包含对父 AggregateExperiment 实例的隐式引用。这使得 MyKeyPredicate 不可序列化。你应该使 MyKeyPredicate 静态:

public static class MyKeyPredicate implements KeyPredicate<String> {
    ...

Hazelcast 目前不支持分布式类加载,而分布式类加载是上述代码正常工作所必需的。

问题是 github 已经针对相同的 https://github.com/hazelcast/hazelcast/issues/7394

打开

现在,那些真正想以这种方式使用 Hazelcast 的人也可以探索 https://github.com/serkan-ozal/hermgen