大:java.io.NotSerializableException:redis.clients.jedis.JedisCluster
Flink: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
当我提交新的 flink 作业时,它抛出
Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 24 more
这是我的代码:
JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);
clickEventDataStream
.filter(Objects::nonNull)
.keyBy(new KeySelector<MobileClickEvent, String>() {
@Override
public String getKey(MobileClickEvent value) throws Exception {
return value.getItemId() + "_" + value.getItemType();
}
})
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
jedisCluster.expire("{item_feature}" + key, 60 * 10);
}
});
In OP's answer,jedisCluster
会为每个元素初始化。
也考虑覆盖 open
,并在那里进行初始化。
Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
private JedisCluster jedisCluster;
@Override
public void open(Configuration parameters) {
jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
}
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
}
});
当我提交新的 flink 作业时,它抛出
Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 24 more
这是我的代码:
JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);
clickEventDataStream
.filter(Objects::nonNull)
.keyBy(new KeySelector<MobileClickEvent, String>() {
@Override
public String getKey(MobileClickEvent value) throws Exception {
return value.getItemId() + "_" + value.getItemType();
}
})
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
jedisCluster.expire("{item_feature}" + key, 60 * 10);
}
});
In OP's answer,jedisCluster
会为每个元素初始化。
也考虑覆盖 open
,并在那里进行初始化。
Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
private JedisCluster jedisCluster;
@Override
public void open(Configuration parameters) {
jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
}
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
}
});