在 hazelcast jet 中序列化

Serialization in the hazelcast jet

当我尝试处理来自 hazelcast jet 的数据时出现以下错误。

Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
    at com.example.LearnJet.joins.LeftJoins.$deserializeLambda$(LeftJoins.java:1)
    ... 59 more

这是代码:- AddToCart1 实例

public class AddToCart1 implements Serializable {
    private int number;
    private String cart;

    public AddToCart1() {
        super();
        // TODO Auto-generated constructor stub
    }

    public int getNumber() {
        return number;
    }

    public AddToCart1(int number, String cart) {
        super();
        this.number = number;
        this.cart = cart;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((cart == null) ? 0 : cart.hashCode());
        result = prime * result + number;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AddToCart1 other = (AddToCart1) obj;
        if (cart == null) {
            if (other.cart != null)
                return false;
        } else if (!cart.equals(other.cart))
            return false;
        if (number != other.number)
            return false;
        return true;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getCart() {
        return cart;
    }

    public void setCart(String cart) {
        this.cart = cart;
    }

}

PageVisit1 实例

public class PageVisit1 implements Serializable {

    /**
     * 
     */
    private int number;
    private String pageName;

    public PageVisit1() {
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + number;
        result = prime * result + ((pageName == null) ? 0 : pageName.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PageVisit1 other = (PageVisit1) obj;
        if (number != other.number)
            return false;
        if (pageName == null) {
            if (other.pageName != null)
                return false;
        } else if (!pageName.equals(other.pageName))
            return false;
        return true;
    }

    public PageVisit1(int number, String pageName) {
        super();
        this.number = number;
        this.pageName = pageName;
    }

    /**
     * @return the number
     */
    public int getNumber() {
        return number;
    }

    /**
     * @param number the number to set
     */
    public void setNumber(int number) {
        this.number = number;
    }

    /**
     * @return the pageName
     */
    public String getPageName() {
        return pageName;
    }

    /**
     * @param pageName the pageName to set
     */
    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

}

这里是主要内容class


public class LeftJoins {

    public static void main(String[] args) throws InvocationTargetException {

        JetInstance jet = Jet.bootstrappedInstance();

        IList<AddToCart1> addToCartList = jet.getList("cart");
        IList<PageVisit1> paymentList = jet.getList("page");

        // AddToCartData
        AddToCart1 ad1 = new AddToCart1();
        ad1.setNumber(1);
        ad1.setCart("lulu bazar");

        AddToCart1 ad2 = new AddToCart1();
        ad2.setNumber(2);
        ad2.setCart("krishna bazar");

        AddToCart1 ad3 = new AddToCart1();
        ad3.setNumber(3);
        ad3.setCart("ram bazar");

        addToCartList.add(ad1);
        addToCartList.add(ad2);
        addToCartList.add(ad3);

        // Page Data
        PageVisit1 pg1 = new PageVisit1();
        pg1.setNumber(1);
        pg1.setPageName("k login");

        PageVisit1 pg2 = new PageVisit1();
        pg2.setNumber(2);
        pg2.setPageName("plogin");

        paymentList.add(pg1);
        paymentList.add(pg2);

        // creating a piple-line here
        Pipeline p = Pipeline.create();
        BatchStageWithKey<AddToCart1, Object> cart = p.readFrom(Sources.<AddToCart1>list("cart"))
                .groupingKey(cart1 -> cart1.getNumber());
        BatchStageWithKey<PageVisit1, Object> page = p.readFrom(Sources.<PageVisit1>list("page"))
                .groupingKey(page1 -> page1.getNumber());

        BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> joinedLists1 = page.aggregate2(toList(), cart, toList())
                .map(Entry::getValue);

        BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> m = joinedLists1.filter(pair -> !pair.f0().isEmpty());

        m.writeTo(Sinks.logger());

        jet.newJob(p).join();
        // joinedLists.filter(pair -> !pair.isEmpty());
    }

代码显然不完整,因为没有 page 变量,所以 page.aggregate2(...) 不应该编译。

因此,我无法为您指出发生问题的确切行。但是,错误消息告诉您正在使用 JDK 中的“标准”lambda,而不是 Serializable,而您应该使用 Jet 中的 lambda,它们是

请检查this package

编辑:

我用上面的代码创建了一个专用的 GitHub project

一切正常。 2 个元组在日志中正确显示:

09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@c544b8f8], [ch.frankel.so.AddToCart1@41c722ed])
09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@58fbcb54], [ch.frankel.so.AddToCart1@c666a2c4])