在 hazelcast jet 中序列化
Serialization in the hazelcast jet
当我尝试处理来自 hazelcast jet 的数据时出现以下错误。
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at com.example.LearnJet.joins.LeftJoins.$deserializeLambda$(LeftJoins.java:1)
... 59 more
这是代码:-
AddToCart1 实例
public class AddToCart1 implements Serializable {
private int number;
private String cart;
public AddToCart1() {
super();
// TODO Auto-generated constructor stub
}
public int getNumber() {
return number;
}
public AddToCart1(int number, String cart) {
super();
this.number = number;
this.cart = cart;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((cart == null) ? 0 : cart.hashCode());
result = prime * result + number;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AddToCart1 other = (AddToCart1) obj;
if (cart == null) {
if (other.cart != null)
return false;
} else if (!cart.equals(other.cart))
return false;
if (number != other.number)
return false;
return true;
}
public void setNumber(int number) {
this.number = number;
}
public String getCart() {
return cart;
}
public void setCart(String cart) {
this.cart = cart;
}
}
PageVisit1 实例
public class PageVisit1 implements Serializable {
/**
*
*/
private int number;
private String pageName;
public PageVisit1() {
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + number;
result = prime * result + ((pageName == null) ? 0 : pageName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PageVisit1 other = (PageVisit1) obj;
if (number != other.number)
return false;
if (pageName == null) {
if (other.pageName != null)
return false;
} else if (!pageName.equals(other.pageName))
return false;
return true;
}
public PageVisit1(int number, String pageName) {
super();
this.number = number;
this.pageName = pageName;
}
/**
* @return the number
*/
public int getNumber() {
return number;
}
/**
* @param number the number to set
*/
public void setNumber(int number) {
this.number = number;
}
/**
* @return the pageName
*/
public String getPageName() {
return pageName;
}
/**
* @param pageName the pageName to set
*/
public void setPageName(String pageName) {
this.pageName = pageName;
}
}
这里是主要内容class
public class LeftJoins {
public static void main(String[] args) throws InvocationTargetException {
JetInstance jet = Jet.bootstrappedInstance();
IList<AddToCart1> addToCartList = jet.getList("cart");
IList<PageVisit1> paymentList = jet.getList("page");
// AddToCartData
AddToCart1 ad1 = new AddToCart1();
ad1.setNumber(1);
ad1.setCart("lulu bazar");
AddToCart1 ad2 = new AddToCart1();
ad2.setNumber(2);
ad2.setCart("krishna bazar");
AddToCart1 ad3 = new AddToCart1();
ad3.setNumber(3);
ad3.setCart("ram bazar");
addToCartList.add(ad1);
addToCartList.add(ad2);
addToCartList.add(ad3);
// Page Data
PageVisit1 pg1 = new PageVisit1();
pg1.setNumber(1);
pg1.setPageName("k login");
PageVisit1 pg2 = new PageVisit1();
pg2.setNumber(2);
pg2.setPageName("plogin");
paymentList.add(pg1);
paymentList.add(pg2);
// creating a piple-line here
Pipeline p = Pipeline.create();
BatchStageWithKey<AddToCart1, Object> cart = p.readFrom(Sources.<AddToCart1>list("cart"))
.groupingKey(cart1 -> cart1.getNumber());
BatchStageWithKey<PageVisit1, Object> page = p.readFrom(Sources.<PageVisit1>list("page"))
.groupingKey(page1 -> page1.getNumber());
BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> joinedLists1 = page.aggregate2(toList(), cart, toList())
.map(Entry::getValue);
BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> m = joinedLists1.filter(pair -> !pair.f0().isEmpty());
m.writeTo(Sinks.logger());
jet.newJob(p).join();
// joinedLists.filter(pair -> !pair.isEmpty());
}
代码显然不完整,因为没有 page
变量,所以 page.aggregate2(...)
不应该编译。
因此,我无法为您指出发生问题的确切行。但是,错误消息告诉您正在使用 JDK 中的“标准”lambda,而不是 Serializable
,而您应该使用 Jet 中的 lambda,它们是
请检查this package。
编辑:
我用上面的代码创建了一个专用的 GitHub project。
一切正常。 2 个元组在日志中正确显示:
09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@c544b8f8], [ch.frankel.so.AddToCart1@41c722ed])
09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@58fbcb54], [ch.frankel.so.AddToCart1@c666a2c4])
当我尝试处理来自 hazelcast jet 的数据时出现以下错误。
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at com.example.LearnJet.joins.LeftJoins.$deserializeLambda$(LeftJoins.java:1)
... 59 more
这是代码:- AddToCart1 实例
public class AddToCart1 implements Serializable {
private int number;
private String cart;
public AddToCart1() {
super();
// TODO Auto-generated constructor stub
}
public int getNumber() {
return number;
}
public AddToCart1(int number, String cart) {
super();
this.number = number;
this.cart = cart;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((cart == null) ? 0 : cart.hashCode());
result = prime * result + number;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AddToCart1 other = (AddToCart1) obj;
if (cart == null) {
if (other.cart != null)
return false;
} else if (!cart.equals(other.cart))
return false;
if (number != other.number)
return false;
return true;
}
public void setNumber(int number) {
this.number = number;
}
public String getCart() {
return cart;
}
public void setCart(String cart) {
this.cart = cart;
}
}
PageVisit1 实例
public class PageVisit1 implements Serializable {
/**
*
*/
private int number;
private String pageName;
public PageVisit1() {
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + number;
result = prime * result + ((pageName == null) ? 0 : pageName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PageVisit1 other = (PageVisit1) obj;
if (number != other.number)
return false;
if (pageName == null) {
if (other.pageName != null)
return false;
} else if (!pageName.equals(other.pageName))
return false;
return true;
}
public PageVisit1(int number, String pageName) {
super();
this.number = number;
this.pageName = pageName;
}
/**
* @return the number
*/
public int getNumber() {
return number;
}
/**
* @param number the number to set
*/
public void setNumber(int number) {
this.number = number;
}
/**
* @return the pageName
*/
public String getPageName() {
return pageName;
}
/**
* @param pageName the pageName to set
*/
public void setPageName(String pageName) {
this.pageName = pageName;
}
}
这里是主要内容class
public class LeftJoins {
public static void main(String[] args) throws InvocationTargetException {
JetInstance jet = Jet.bootstrappedInstance();
IList<AddToCart1> addToCartList = jet.getList("cart");
IList<PageVisit1> paymentList = jet.getList("page");
// AddToCartData
AddToCart1 ad1 = new AddToCart1();
ad1.setNumber(1);
ad1.setCart("lulu bazar");
AddToCart1 ad2 = new AddToCart1();
ad2.setNumber(2);
ad2.setCart("krishna bazar");
AddToCart1 ad3 = new AddToCart1();
ad3.setNumber(3);
ad3.setCart("ram bazar");
addToCartList.add(ad1);
addToCartList.add(ad2);
addToCartList.add(ad3);
// Page Data
PageVisit1 pg1 = new PageVisit1();
pg1.setNumber(1);
pg1.setPageName("k login");
PageVisit1 pg2 = new PageVisit1();
pg2.setNumber(2);
pg2.setPageName("plogin");
paymentList.add(pg1);
paymentList.add(pg2);
// creating a piple-line here
Pipeline p = Pipeline.create();
BatchStageWithKey<AddToCart1, Object> cart = p.readFrom(Sources.<AddToCart1>list("cart"))
.groupingKey(cart1 -> cart1.getNumber());
BatchStageWithKey<PageVisit1, Object> page = p.readFrom(Sources.<PageVisit1>list("page"))
.groupingKey(page1 -> page1.getNumber());
BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> joinedLists1 = page.aggregate2(toList(), cart, toList())
.map(Entry::getValue);
BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> m = joinedLists1.filter(pair -> !pair.f0().isEmpty());
m.writeTo(Sinks.logger());
jet.newJob(p).join();
// joinedLists.filter(pair -> !pair.isEmpty());
}
代码显然不完整,因为没有 page
变量,所以 page.aggregate2(...)
不应该编译。
因此,我无法为您指出发生问题的确切行。但是,错误消息告诉您正在使用 JDK 中的“标准”lambda,而不是 Serializable
,而您应该使用 Jet 中的 lambda,它们是
请检查this package。
编辑:
我用上面的代码创建了一个专用的 GitHub project。
一切正常。 2 个元组在日志中正确显示:
09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@c544b8f8], [ch.frankel.so.AddToCart1@41c722ed])
09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@58fbcb54], [ch.frankel.so.AddToCart1@c666a2c4])