为什么我的 java lambda 表达式不能工作,而它的命令式风格可以正常工作?
Why my java lambda expression cannot work while its imperative style works properly?
我有多年 Java 8 及其 lambda 的经验。但是我在开发一个 hello-world-size 的 Spark 程序时遇到了一个疯狂的问题。
这里我有一个Javaclass,其中Data注解来自Lombok:
@Data
public class Person implements Serializable {
private String name;
private Long age;
}
然后我构建了一个 java 列表,其中包含 Persion
class:
的对象
Person p1 = new Person("sb", 1L);
Person p2 = new Person("sth", null);
List<Person> list = new ArrayList<>(2);
list.add(p1);
list.add(p2);
到目前为止还不错。然后我尝试使用列表生成一个 Spark 数据集:
SparkSession session = SparkSession.builder().master("local[1]").appName("SparkSqlApp").getOrCreate();
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> dataset1 = session.createDataset(list, personEncoder);
dataset1.foreach(new ForeachFunction<Person>() { // 1
@Override
public void call(Person person) throws Exception {
System.out.println(person);
}
});
dataset1.foreach((ForeachFunction<Person>) System.out::println); //2
请注意,块 1 等同于 java 中的块 2,块 2 由 IntelliJ IDEA 从块 1 简化而来。唯一的区别是块 2 使用的是 lambda 表达式。
然而,当我执行程序时,块 1 结束良好,而块 2 运行 异常:
什么……大地球大宇宙?为什么 JVM 或 Spark 引擎会这样做?!
接口 ForeachFunction 扩展 Serializable
。 Dataset.foreach(f)
可能正在序列化参数 f
。在下面的测试中,testBlock1
成功,testBlcok2
失败(NotSerializableException)。但是不知道为什么。
public class AAA implements Serializable {
@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
void call(T t) throws Exception;
}
@Test
public void testBlock1() throws FileNotFoundException, IOException {
ForeachFunction<String> functionBody = new ForeachFunction<String>() {
public void call(String t) throws Exception {
System.out.println(t);
}
};
try (FileOutputStream fos = new FileOutputStream("data/block1.obj");
ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(functionBody); // success
}
}
@Test
public void testBlock2() throws FileNotFoundException, IOException {
ForeachFunction<String> functionBody = System.out::println;
try (FileOutputStream fos = new FileOutputStream("data/block2.obj");
ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(functionBody); // fail (NotSerializableException)
}
}
}
如 中所述,方法引用 System.out::println
与 lambda 表达式 x -> System.out.println(x)
.
不同
方法引用 捕获 System.out
的当前值,以便在每次调用函数时对其调用 println
,而不是评估 System.out
每次都像 lambda 表达式的主体一样。
如前所述,这很少会产生影响,但在这里却有影响。当您尝试序列化该函数时,它会尝试序列化所有捕获的值,包括在实例化期间从 System.out
读取的 PrintStream
实例。 PrintStream
是不可序列化的,实现可序列化的 PrintStream
来满足预期是非常具有挑战性的。
但重要的是要记住,当您序列化 lambda 表达式 x -> System.out.println(x)
或等效的 class 对象并在不同的环境中反序列化它时,System.out
它将读取那里的评估结果 PrintStream
与您的原始环境不同。当分布式计算框架负责将打印到标准输出的所有内容通过管道返回给发起者时,这并不重要。
但重要的是要记住 static
不属于序列化数据的字段通常在不同的环境中可能具有不同的内容。
我有多年 Java 8 及其 lambda 的经验。但是我在开发一个 hello-world-size 的 Spark 程序时遇到了一个疯狂的问题。
这里我有一个Javaclass,其中Data注解来自Lombok:
@Data
public class Person implements Serializable {
private String name;
private Long age;
}
然后我构建了一个 java 列表,其中包含 Persion
class:
Person p1 = new Person("sb", 1L);
Person p2 = new Person("sth", null);
List<Person> list = new ArrayList<>(2);
list.add(p1);
list.add(p2);
到目前为止还不错。然后我尝试使用列表生成一个 Spark 数据集:
SparkSession session = SparkSession.builder().master("local[1]").appName("SparkSqlApp").getOrCreate();
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> dataset1 = session.createDataset(list, personEncoder);
dataset1.foreach(new ForeachFunction<Person>() { // 1
@Override
public void call(Person person) throws Exception {
System.out.println(person);
}
});
dataset1.foreach((ForeachFunction<Person>) System.out::println); //2
请注意,块 1 等同于 java 中的块 2,块 2 由 IntelliJ IDEA 从块 1 简化而来。唯一的区别是块 2 使用的是 lambda 表达式。
然而,当我执行程序时,块 1 结束良好,而块 2 运行 异常:
什么……大地球大宇宙?为什么 JVM 或 Spark 引擎会这样做?!
接口 ForeachFunction 扩展 Serializable
。 Dataset.foreach(f)
可能正在序列化参数 f
。在下面的测试中,testBlock1
成功,testBlcok2
失败(NotSerializableException)。但是不知道为什么。
public class AAA implements Serializable {
@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
void call(T t) throws Exception;
}
@Test
public void testBlock1() throws FileNotFoundException, IOException {
ForeachFunction<String> functionBody = new ForeachFunction<String>() {
public void call(String t) throws Exception {
System.out.println(t);
}
};
try (FileOutputStream fos = new FileOutputStream("data/block1.obj");
ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(functionBody); // success
}
}
@Test
public void testBlock2() throws FileNotFoundException, IOException {
ForeachFunction<String> functionBody = System.out::println;
try (FileOutputStream fos = new FileOutputStream("data/block2.obj");
ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(functionBody); // fail (NotSerializableException)
}
}
}
如 System.out::println
与 lambda 表达式 x -> System.out.println(x)
.
方法引用 捕获 System.out
的当前值,以便在每次调用函数时对其调用 println
,而不是评估 System.out
每次都像 lambda 表达式的主体一样。
如前所述,这很少会产生影响,但在这里却有影响。当您尝试序列化该函数时,它会尝试序列化所有捕获的值,包括在实例化期间从 System.out
读取的 PrintStream
实例。 PrintStream
是不可序列化的,实现可序列化的 PrintStream
来满足预期是非常具有挑战性的。
但重要的是要记住,当您序列化 lambda 表达式 x -> System.out.println(x)
或等效的 class 对象并在不同的环境中反序列化它时,System.out
它将读取那里的评估结果 PrintStream
与您的原始环境不同。当分布式计算框架负责将打印到标准输出的所有内容通过管道返回给发起者时,这并不重要。
但重要的是要记住 static
不属于序列化数据的字段通常在不同的环境中可能具有不同的内容。