获取异常 NotSerializableException:com.hazelcast.map.impl.proxy.MapProxyImpl
Getting and exception NotSerializableException: com.hazelcast.map.impl.proxy.MapProxyImpl
我是 hazelcast-jet 的新手,我的用例是从 Kafka 源读取并在检查其在 hazelcastIMDG 中的值后进行过滤。
我什至在创建管道之前就获取并加载了 IMDG 地图。见下文
IMap<String, Policy> policyMap =jet.getHazelcastInstance().getMap(POLICY_MAP_NAME);
Utility.populatePoliciesMap(policyMap);
在 buildPipeline 方法中将 policyMap 作为参数传递。
我已经创建了如下管道
StreamStage<TimestampedEntry<String, Long>> streamStage = pipeline.drawFrom(KafkaSources.kafka(brokerConsumerProperties(), projectionFn, getIngestTopic()))
.addTimestamps()
.flatMap(ingestData -> traverseArray(ingestData.getMapRequestParameterTree().toArray(new String[ingestData.getMapRequestParameterTree().size()])))
.filter(hash -> policyMap.get(hash)!=null)
.window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(10)))
.groupingKey(wholeItem())
.aggregate(counting())
.map((TimestampedEntry<String, Long> e) -> entry(e.getKey(), createBlacklistObjectEvent(Utility.fetchPolicy(e.getKey()), e.getTimestamp(), e.getValue())));
timestampedEntryStreamStage.drainTo(Sinks.map(BL_MAP_NAME));
但是有了这个我就低于异常
Exception in thread "main" java.lang.IllegalArgumentException:
"filterFn" must be serializable at
com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:301) at
com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:129)
at
com.hazelcast.jet.impl.pipeline.StreamStageImpl.filter(StreamStageImpl.java:71)
at
com.visa.rls.handler.HazelcastJetIngetstResultHandler.buildPipeline(HazelcastJetIngetstResultHandler.java:120)
at
com.visa.rls.handler.HazelcastJetIngetstResultHandler.run(HazelcastJetIngetstResultHandler.java:84)
at
com.visa.rls.handler.HazelcastJetIngetstResultHandler.main(HazelcastJetIngetstResultHandler.java:58)
Caused by: java.io.NotSerializableException:
com.hazelcast.map.impl.proxy.MapProxyImpl at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299)
... 5 more
您在过滤器函数中使用了 policyMap
,但 IMap
不可序列化。它被 lambda 表达式捕获。您必须在每个远程成员上获取 IMap
实例,为此您可以使用 filterUsingContext
而不是 filter
:
.filterUsingContext(
ContextFactory.withCreateFn(jetInstance -> jetInstance.getMap(POLICY_MAP_NAME)),
(policyMap, hash) -> policyMap.get(hash) != null
)
我是 hazelcast-jet 的新手,我的用例是从 Kafka 源读取并在检查其在 hazelcastIMDG 中的值后进行过滤。
我什至在创建管道之前就获取并加载了 IMDG 地图。见下文
IMap<String, Policy> policyMap =jet.getHazelcastInstance().getMap(POLICY_MAP_NAME);
Utility.populatePoliciesMap(policyMap);
在 buildPipeline 方法中将 policyMap 作为参数传递。
我已经创建了如下管道
StreamStage<TimestampedEntry<String, Long>> streamStage = pipeline.drawFrom(KafkaSources.kafka(brokerConsumerProperties(), projectionFn, getIngestTopic()))
.addTimestamps()
.flatMap(ingestData -> traverseArray(ingestData.getMapRequestParameterTree().toArray(new String[ingestData.getMapRequestParameterTree().size()])))
.filter(hash -> policyMap.get(hash)!=null)
.window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(10)))
.groupingKey(wholeItem())
.aggregate(counting())
.map((TimestampedEntry<String, Long> e) -> entry(e.getKey(), createBlacklistObjectEvent(Utility.fetchPolicy(e.getKey()), e.getTimestamp(), e.getValue())));
timestampedEntryStreamStage.drainTo(Sinks.map(BL_MAP_NAME));
但是有了这个我就低于异常
Exception in thread "main" java.lang.IllegalArgumentException: "filterFn" must be serializable at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:301) at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:129) at com.hazelcast.jet.impl.pipeline.StreamStageImpl.filter(StreamStageImpl.java:71) at com.visa.rls.handler.HazelcastJetIngetstResultHandler.buildPipeline(HazelcastJetIngetstResultHandler.java:120) at com.visa.rls.handler.HazelcastJetIngetstResultHandler.run(HazelcastJetIngetstResultHandler.java:84) at com.visa.rls.handler.HazelcastJetIngetstResultHandler.main(HazelcastJetIngetstResultHandler.java:58) Caused by: java.io.NotSerializableException: com.hazelcast.map.impl.proxy.MapProxyImpl at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 5 more
您在过滤器函数中使用了 policyMap
,但 IMap
不可序列化。它被 lambda 表达式捕获。您必须在每个远程成员上获取 IMap
实例,为此您可以使用 filterUsingContext
而不是 filter
:
.filterUsingContext(
ContextFactory.withCreateFn(jetInstance -> jetInstance.getMap(POLICY_MAP_NAME)),
(policyMap, hash) -> policyMap.get(hash) != null
)