Java Spark 数据集 MapFunction - 任务在没有任何参考的情况下不可序列化 class
Java Spark Dataset MapFunction - Task not serializable without any reference to class
我有一个关注 class 将 csv 数据读入 Spark 的 Dataset
。如果我只是简单地阅读 return data
.
一切正常
但是,如果我在从函数 returning 之前将 MapFunction
应用于 data
,我会得到
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: com.Workflow
.
我知道 Spark 的工作原理及其需要序列化对象以进行分布式处理,但是,我没有在我的映射逻辑中使用任何对 Workflow
class 的引用。我没有在我的映射逻辑中调用任何 Workflow
class 函数。那么为什么 Spark 试图序列化 Workflow
class?任何帮助将不胜感激。
public class Workflow {
private final SparkSession spark;
public Dataset<Row> readData(){
final StructType schema = new StructType()
.add("text", "string", false)
.add("category", "string", false);
Dataset<Row> data = spark.read()
.schema(schema)
.csv(dataPath);
/*
* works fine till here if I call
* return data;
*/
Dataset<Row> cleanedData = data.map(new MapFunction<Row, Row>() {
public Row call(Row row){
/* some mapping logic */
return row;
}
}, RowEncoder.apply(schema));
cleanedData.printSchema();
/* .... ERROR .... */
cleanedData.show();
return cleanedData;
}
}
您可以让 Workflow 将 Serializeble 和 SparkSession 实现为 @transient
匿名内部 classes 有一个 hidden/implicit 引用包围 class。使用 Lambda 表达式或使用 Roma Anankin 的解决方案
我有一个关注 class 将 csv 数据读入 Spark 的 Dataset
。如果我只是简单地阅读 return data
.
但是,如果我在从函数 returning 之前将 MapFunction
应用于 data
,我会得到
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: com.Workflow
.
我知道 Spark 的工作原理及其需要序列化对象以进行分布式处理,但是,我没有在我的映射逻辑中使用任何对 Workflow
class 的引用。我没有在我的映射逻辑中调用任何 Workflow
class 函数。那么为什么 Spark 试图序列化 Workflow
class?任何帮助将不胜感激。
public class Workflow {
private final SparkSession spark;
public Dataset<Row> readData(){
final StructType schema = new StructType()
.add("text", "string", false)
.add("category", "string", false);
Dataset<Row> data = spark.read()
.schema(schema)
.csv(dataPath);
/*
* works fine till here if I call
* return data;
*/
Dataset<Row> cleanedData = data.map(new MapFunction<Row, Row>() {
public Row call(Row row){
/* some mapping logic */
return row;
}
}, RowEncoder.apply(schema));
cleanedData.printSchema();
/* .... ERROR .... */
cleanedData.show();
return cleanedData;
}
}
您可以让 Workflow 将 Serializeble 和 SparkSession 实现为 @transient
匿名内部 classes 有一个 hidden/implicit 引用包围 class。使用 Lambda 表达式或使用 Roma Anankin 的解决方案