在 Hazelcast Jet 处理器上使用 Hazelcast IMap

Use Hazelcast IMap at Hazelcast Jet Processor

我刚刚开始学习 Hazelcast Jet。 我的来源是 UDP 数据报。我想在 Jet 的一些节点上并行处理,并通过 'domain' 重新发送到其他地址。 我想将 Hazelcast IMDG IMap 与加载器一起使用,通过 'source ip' 获得 'domain'。

DAG dag = new DAG();        
Vertex source = dag.newVertex("datagram-source",
                UdpSocketP.supplier("0.0.0.0", 41813));
        source.localParallelism(1);

        Vertex mapper = dag.newVertex("map",
                map(new DomainMapper(instance.getMap("mysqlNas"))));

        Vertex sink = dag.newVertex("sink",
                Sinks.writeFile("logs"));
        sink.localParallelism(1);

但是当我尝试在 DistributedFunction 中使用 IMap 时出现异常

Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
    at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
    at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator

DomainMapper 代码:

package org.eltex.softwlc.sorm.replicator;

import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;

import java.io.Serializable;
import java.net.DatagramPacket;

/**
 * Created by mickey on 21.07.17.
 */
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {

    private final IMap<String, NasValue> map;

    public DomainMapper(IMap<String, NasValue> map) {
        this.map = map;
    }

    @Override
    public IpData apply(DatagramPacket datagramPacket) {
        final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
        System.out.println(d);

        final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
        if (nasValue!=null) {
            d.setDomain(nasValue.getDomain());
        }

        return d;
    }
}

我的错误是什么? 或者 Hazelcast Jet 对我来说是错误的选择。

问题是您试图在函数内序列化整个 IMap。一个直接的解决方法是编写一个自定义处理器,该处理器可以访问其 init() 方法中的 Hazelcast Jet 实例,并从中查找其 IMap。由于 init() 代码是在目标成员上执行的,在所有反序列化之后,这将起作用。

但是,在更一般的层面上,您的目标似乎属于 "data enrichment" 类型。我们希望在 Jet 中支持这一点的方式是通过 "hash join" 操作,目前不是 first-class;但是有一个代码示例显示了该方法。您可以将整个 IMap 内容汇集到一个顶点,将其变成一个普通的 HashMap 并分发给所有丰富的处理器,或者您可以准备一个 Hazelcast ReplicatedMap 将直接使用通过丰富的处理器。

第一种方法意味着您针对 IMap 的快照进行工作;在第二个中,您可以继续更新 ReplicatedMap,因为工作是 运行。

最好去检查样本:HashMapEnrichment and ReplicatedMapEnrichment