在 Hazelcast Jet 处理器上使用 Hazelcast IMap
Use Hazelcast IMap at Hazelcast Jet Processor
我刚刚开始学习 Hazelcast Jet。
我的来源是 UDP 数据报。我想在 Jet 的一些节点上并行处理,并通过 'domain' 重新发送到其他地址。
我想将 Hazelcast IMDG IMap 与加载器一起使用,通过 'source ip' 获得 'domain'。
DAG dag = new DAG();
Vertex source = dag.newVertex("datagram-source",
UdpSocketP.supplier("0.0.0.0", 41813));
source.localParallelism(1);
Vertex mapper = dag.newVertex("map",
map(new DomainMapper(instance.getMap("mysqlNas"))));
Vertex sink = dag.newVertex("sink",
Sinks.writeFile("logs"));
sink.localParallelism(1);
但是当我尝试在 DistributedFunction 中使用 IMap 时出现异常
Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator
DomainMapper 代码:
package org.eltex.softwlc.sorm.replicator;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;
import java.io.Serializable;
import java.net.DatagramPacket;
/**
* Created by mickey on 21.07.17.
*/
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {
private final IMap<String, NasValue> map;
public DomainMapper(IMap<String, NasValue> map) {
this.map = map;
}
@Override
public IpData apply(DatagramPacket datagramPacket) {
final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
System.out.println(d);
final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
if (nasValue!=null) {
d.setDomain(nasValue.getDomain());
}
return d;
}
}
我的错误是什么?
或者 Hazelcast Jet 对我来说是错误的选择。
问题是您试图在函数内序列化整个 IMap
。一个直接的解决方法是编写一个自定义处理器,该处理器可以访问其 init()
方法中的 Hazelcast Jet 实例,并从中查找其 IMap。由于 init()
代码是在目标成员上执行的,在所有反序列化之后,这将起作用。
但是,在更一般的层面上,您的目标似乎属于 "data enrichment" 类型。我们希望在 Jet 中支持这一点的方式是通过 "hash join" 操作,目前不是 first-class;但是有一个代码示例显示了该方法。您可以将整个 IMap
内容汇集到一个顶点,将其变成一个普通的 HashMap
并分发给所有丰富的处理器,或者您可以准备一个 Hazelcast ReplicatedMap
将直接使用通过丰富的处理器。
第一种方法意味着您针对 IMap
的快照进行工作;在第二个中,您可以继续更新 ReplicatedMap
,因为工作是 运行。
最好去检查样本:HashMapEnrichment and ReplicatedMapEnrichment。
我刚刚开始学习 Hazelcast Jet。 我的来源是 UDP 数据报。我想在 Jet 的一些节点上并行处理,并通过 'domain' 重新发送到其他地址。 我想将 Hazelcast IMDG IMap 与加载器一起使用,通过 'source ip' 获得 'domain'。
DAG dag = new DAG();
Vertex source = dag.newVertex("datagram-source",
UdpSocketP.supplier("0.0.0.0", 41813));
source.localParallelism(1);
Vertex mapper = dag.newVertex("map",
map(new DomainMapper(instance.getMap("mysqlNas"))));
Vertex sink = dag.newVertex("sink",
Sinks.writeFile("logs"));
sink.localParallelism(1);
但是当我尝试在 DistributedFunction 中使用 IMap 时出现异常
Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator
DomainMapper 代码:
package org.eltex.softwlc.sorm.replicator;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;
import java.io.Serializable;
import java.net.DatagramPacket;
/**
* Created by mickey on 21.07.17.
*/
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {
private final IMap<String, NasValue> map;
public DomainMapper(IMap<String, NasValue> map) {
this.map = map;
}
@Override
public IpData apply(DatagramPacket datagramPacket) {
final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
System.out.println(d);
final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
if (nasValue!=null) {
d.setDomain(nasValue.getDomain());
}
return d;
}
}
我的错误是什么? 或者 Hazelcast Jet 对我来说是错误的选择。
问题是您试图在函数内序列化整个 IMap
。一个直接的解决方法是编写一个自定义处理器,该处理器可以访问其 init()
方法中的 Hazelcast Jet 实例,并从中查找其 IMap。由于 init()
代码是在目标成员上执行的,在所有反序列化之后,这将起作用。
但是,在更一般的层面上,您的目标似乎属于 "data enrichment" 类型。我们希望在 Jet 中支持这一点的方式是通过 "hash join" 操作,目前不是 first-class;但是有一个代码示例显示了该方法。您可以将整个 IMap
内容汇集到一个顶点,将其变成一个普通的 HashMap
并分发给所有丰富的处理器,或者您可以准备一个 Hazelcast ReplicatedMap
将直接使用通过丰富的处理器。
第一种方法意味着您针对 IMap
的快照进行工作;在第二个中,您可以继续更新 ReplicatedMap
,因为工作是 运行。
最好去检查样本:HashMapEnrichment and ReplicatedMapEnrichment。