在 Apache Flink 中使用 Collections$UnmodifiableCollection
Using an Collections$UnmodifiableCollection with Apache Flink
使用 Apache Flink 时,代码如下:
DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {
@Override
public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
collector.collect(top5);
}
}).flatten();
我遇到了这个异常
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
如何在 Flink 上使用 UnmodifiableCollection
?
问题是 Kryo 的默认 CollectionSerializer
无法再次反序列化集合,因为它不可修改(.add()
调用失败)。
要解决此问题,我们可以使用 kryo-serializers 项目中的 UnmodifiableCollectionsSerializer
。 Flink 传递依赖项目,所以不需要添加依赖
接下来,我们必须向 Flink 的 Kryo 实例注册序列化程序。
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
通常,我们不必调用Class.forName()
来注册序列化器,但在这种情况下,java.util.Collections$UnmodifiableCollection
是包可见的,所以我们不能直接访问class .
你可以试试这个,因为 kryo-serializers repo 在过去几年没有改变。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val avroKryoSerializerUtil = new AvroKryoSerializerUtils
avroKryoSerializerUtil.addAvroSerializersIfRequired(env.getConfig,classOf[GenericData.Record])
使用 Apache Flink 时,代码如下:
DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {
@Override
public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
collector.collect(top5);
}
}).flatten();
我遇到了这个异常
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
如何在 Flink 上使用 UnmodifiableCollection
?
问题是 Kryo 的默认 CollectionSerializer
无法再次反序列化集合,因为它不可修改(.add()
调用失败)。
要解决此问题,我们可以使用 kryo-serializers 项目中的 UnmodifiableCollectionsSerializer
。 Flink 传递依赖项目,所以不需要添加依赖
接下来,我们必须向 Flink 的 Kryo 实例注册序列化程序。
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
通常,我们不必调用Class.forName()
来注册序列化器,但在这种情况下,java.util.Collections$UnmodifiableCollection
是包可见的,所以我们不能直接访问class .
你可以试试这个,因为 kryo-serializers repo 在过去几年没有改变。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val avroKryoSerializerUtil = new AvroKryoSerializerUtils
avroKryoSerializerUtil.addAvroSerializersIfRequired(env.getConfig,classOf[GenericData.Record])