Hazelcast Jet "mapFn" 必须是可序列化错误
Hazelcast Jet "mapFn" must be Serializable Error
美好的一天,我在将 github 中的某些代码源实施到我的项目时遇到问题,所以...我正在尝试构建管道,然后创建 StreamStage 以读取源代码并构建一个带有项目内部方法的queuesink,我总是得到同样的错误“线程“main”中的异常java.lang.IllegalArgumentException:“mapFn”必须是可序列化的
我正在阅读有关 Hazelcast Jet 中序列化的文档,一切看起来都很完美我只是不知道项目中的问题是什么
这是属性和常量:
public class FraudDetectionRun implements Serializable{
private final static ILogger log = Logger.getLogger(FraudDetectionRun.class);
private static final String TXN_QUEUE_ID = PropertiesLoader.TXN_QUEUE_ID;
private static final String ACCOUNT_MAP = PropertiesLoader.ACCOUNT_MAP;
private static final String MERCHANT_MAP = PropertiesLoader.MERCHANT_MAP;
private static final String RULESRESULT_MAP = PropertiesLoader.RULESRESULT_MAP;
private HazelcastInstance clientInstance;
private JetInstance jet;
private static MerchantRuleEngine merchantRuleEngine;
private static HistoricalDataRuleEngine historicalRuleEngine;
private IMap<String, Merchant> merchantMap;
private IMap<String, Account> accountMap;
public static void main(String[] args) {
new FraudDetectionRun().start();
}
这是主要代码
` private void start() {
初始化();
Pipeline p = buildPipeline();
JobConfig jobConfig = new JobConfig();
jobConfig.setName("Fraud Detection Job");
jet.newJobIfAbsent(p, jobConfig);
}
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
StreamStage<Transaction> transaction = p.readFrom(buildQueueSource())
.withoutTimestamps()
.map(restValue -> transformToTransaction(restValue))
.setName("This is where we transfor the RestValue into a Transaction");
StreamStage<Transaction> appliyingMerchantRules = transaction.map(index -> applyMerchantRules(index))
.setName("We search for the method Apply Merchant to apply the rules to the transaction");
StreamStage<Transaction> rulesIntoTransaction = appliyingMerchantRules.map(index -> applyHistoricalTxnRules(index))
.setName("Apply Historical transactions rules");
rulesIntoTransaction.writeTo(Sinks.map(RULESRESULT_MAP, Transaction::getTransactionId, Transaction::getRulesResult));
rulesIntoTransaction.writeTo(buildQueueSink());
log.info(p.toDotString());
return p;
}
private String transformResultsToString(Transaction txn) {
RulesResult result = txn.getRulesResult();
return "txnID: "+txn.getTransactionId()+" "+result.getMerchantRisk()+" "+result.getTransactionRisk();
}
private Transaction applyHistoricalTxnRules(Transaction txn) {
log.info("Applying rules on historical data");
historicalRuleEngine.apply(txn, accountMap.get(txn.getAccountNumber()).getHistoricalTransactions());
return txn;
}
private Transaction applyMerchantRules(Transaction txn) {
log.info("Applying merchant rules");
merchantRuleEngine.apply(txn, merchantMap.get(txn.getMerchantId()));
return txn;
}
private static Transaction transformToTransaction(RestValue restValue) {
log.info("Applying transformToTransaction");
return TransactionBuilderUtil.transformToTransaction(new String(restValue.getValue()));
}
private Sink<? super Transaction> buildQueueSink() {
return SinkBuilder.sinkBuilder("queueSink",
jet -> jet.jetInstance().getHazelcastInstance().<String>getQueue("sink-queue"))
.<Transaction>receiveFn( (queue, txn)-> queue.add(transformResultsToString(txn)))
.build();
}
private StreamSource<RestValue> buildQueueSource() {
StreamSource<RestValue> source = SourceBuilder.<QueueContext<RestValue>>stream(TXN_QUEUE_ID, c -> new QueueContext<>(c.jetInstance().getHazelcastInstance().getQueue(TXN_QUEUE_ID)))
.<RestValue>fillBufferFn(QueueContext::fillBuffer)
.build();
return source;
}
static class QueueContext<T> extends AbstractCollection<T> {
static final int MAX_ELEMENTS = 1024;
IQueue<T> queue;
SourceBuilder.SourceBuffer<T> buf;
QueueContext(IQueue<T> queue) {
this.queue = queue;
}
void fillBuffer(SourceBuilder.SourceBuffer<T> buf) {
this.buf = buf;
queue.drainTo(this, MAX_ELEMENTS);
}
@Override
public boolean add(T item) {
buf.add(item);
return true;
}
@Override
public Iterator<T> iterator() {
throw new UnsupportedOperationException();
}
@Override
public int size() {
throw new UnsupportedOperationException();
}
}`
程序总是抛出这个错误
Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:188)
at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachMap(ComputeStageImplBase.java:146)
at com.hazelcast.jet.impl.pipeline.StreamStageImpl.map(StreamStageImpl.java:87)
at com.hazelcast.certification.control.FraudDetectionRun.buildPipeline(FraudDetectionRun.java:115)
at com.hazelcast.certification.control.FraudDetectionRun.start(FraudDetectionRun.java:91)
at com.hazelcast.certification.control.FraudDetectionRun.main(FraudDetectionRun.java:50)
Caused by: java.io.NotSerializableException: com.hazelcast.client.impl.proxy.ClientMapProxy
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:186)
... 5 more
看来问题就出在这里
StreamStage<Transaction> appliyingMerchantRules = transaction.map(index -> applyMerchantRules(index))
出于某种原因,¿你能告诉我我做错了什么吗?
您的 applyMerchantRules
方法是非静态的,因此 lambda 捕获封闭的 this
实例。似乎您已将 FraudDetectionRun
序列化以尝试解决此问题,但您应该将 applyMerchantRules
设为静态。
美好的一天,我在将 github 中的某些代码源实施到我的项目时遇到问题,所以...我正在尝试构建管道,然后创建 StreamStage 以读取源代码并构建一个带有项目内部方法的queuesink,我总是得到同样的错误“线程“main”中的异常java.lang.IllegalArgumentException:“mapFn”必须是可序列化的
我正在阅读有关 Hazelcast Jet 中序列化的文档,一切看起来都很完美我只是不知道项目中的问题是什么
这是属性和常量:
public class FraudDetectionRun implements Serializable{
private final static ILogger log = Logger.getLogger(FraudDetectionRun.class);
private static final String TXN_QUEUE_ID = PropertiesLoader.TXN_QUEUE_ID;
private static final String ACCOUNT_MAP = PropertiesLoader.ACCOUNT_MAP;
private static final String MERCHANT_MAP = PropertiesLoader.MERCHANT_MAP;
private static final String RULESRESULT_MAP = PropertiesLoader.RULESRESULT_MAP;
private HazelcastInstance clientInstance;
private JetInstance jet;
private static MerchantRuleEngine merchantRuleEngine;
private static HistoricalDataRuleEngine historicalRuleEngine;
private IMap<String, Merchant> merchantMap;
private IMap<String, Account> accountMap;
public static void main(String[] args) {
new FraudDetectionRun().start();
}
这是主要代码
` private void start() { 初始化();
Pipeline p = buildPipeline();
JobConfig jobConfig = new JobConfig();
jobConfig.setName("Fraud Detection Job");
jet.newJobIfAbsent(p, jobConfig);
}
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
StreamStage<Transaction> transaction = p.readFrom(buildQueueSource())
.withoutTimestamps()
.map(restValue -> transformToTransaction(restValue))
.setName("This is where we transfor the RestValue into a Transaction");
StreamStage<Transaction> appliyingMerchantRules = transaction.map(index -> applyMerchantRules(index))
.setName("We search for the method Apply Merchant to apply the rules to the transaction");
StreamStage<Transaction> rulesIntoTransaction = appliyingMerchantRules.map(index -> applyHistoricalTxnRules(index))
.setName("Apply Historical transactions rules");
rulesIntoTransaction.writeTo(Sinks.map(RULESRESULT_MAP, Transaction::getTransactionId, Transaction::getRulesResult));
rulesIntoTransaction.writeTo(buildQueueSink());
log.info(p.toDotString());
return p;
}
private String transformResultsToString(Transaction txn) {
RulesResult result = txn.getRulesResult();
return "txnID: "+txn.getTransactionId()+" "+result.getMerchantRisk()+" "+result.getTransactionRisk();
}
private Transaction applyHistoricalTxnRules(Transaction txn) {
log.info("Applying rules on historical data");
historicalRuleEngine.apply(txn, accountMap.get(txn.getAccountNumber()).getHistoricalTransactions());
return txn;
}
private Transaction applyMerchantRules(Transaction txn) {
log.info("Applying merchant rules");
merchantRuleEngine.apply(txn, merchantMap.get(txn.getMerchantId()));
return txn;
}
private static Transaction transformToTransaction(RestValue restValue) {
log.info("Applying transformToTransaction");
return TransactionBuilderUtil.transformToTransaction(new String(restValue.getValue()));
}
private Sink<? super Transaction> buildQueueSink() {
return SinkBuilder.sinkBuilder("queueSink",
jet -> jet.jetInstance().getHazelcastInstance().<String>getQueue("sink-queue"))
.<Transaction>receiveFn( (queue, txn)-> queue.add(transformResultsToString(txn)))
.build();
}
private StreamSource<RestValue> buildQueueSource() {
StreamSource<RestValue> source = SourceBuilder.<QueueContext<RestValue>>stream(TXN_QUEUE_ID, c -> new QueueContext<>(c.jetInstance().getHazelcastInstance().getQueue(TXN_QUEUE_ID)))
.<RestValue>fillBufferFn(QueueContext::fillBuffer)
.build();
return source;
}
static class QueueContext<T> extends AbstractCollection<T> {
static final int MAX_ELEMENTS = 1024;
IQueue<T> queue;
SourceBuilder.SourceBuffer<T> buf;
QueueContext(IQueue<T> queue) {
this.queue = queue;
}
void fillBuffer(SourceBuilder.SourceBuffer<T> buf) {
this.buf = buf;
queue.drainTo(this, MAX_ELEMENTS);
}
@Override
public boolean add(T item) {
buf.add(item);
return true;
}
@Override
public Iterator<T> iterator() {
throw new UnsupportedOperationException();
}
@Override
public int size() {
throw new UnsupportedOperationException();
}
}`
程序总是抛出这个错误
Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:188)
at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachMap(ComputeStageImplBase.java:146)
at com.hazelcast.jet.impl.pipeline.StreamStageImpl.map(StreamStageImpl.java:87)
at com.hazelcast.certification.control.FraudDetectionRun.buildPipeline(FraudDetectionRun.java:115)
at com.hazelcast.certification.control.FraudDetectionRun.start(FraudDetectionRun.java:91)
at com.hazelcast.certification.control.FraudDetectionRun.main(FraudDetectionRun.java:50)
Caused by: java.io.NotSerializableException: com.hazelcast.client.impl.proxy.ClientMapProxy
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:186)
... 5 more
看来问题就出在这里
StreamStage<Transaction> appliyingMerchantRules = transaction.map(index -> applyMerchantRules(index))
出于某种原因,¿你能告诉我我做错了什么吗?
您的 applyMerchantRules
方法是非静态的,因此 lambda 捕获封闭的 this
实例。似乎您已将 FraudDetectionRun
序列化以尝试解决此问题,但您应该将 applyMerchantRules
设为静态。