Spark SQL UDF 任务不可序列化

Spark SQL UDF Task not serializable

Cassandra 和 DataStax 社区,我有一个问题希望有智慧的人可以帮助我。

我们正在将分析代码从 Hadoop 迁移到 Cassandra 之上的 Spark 运行(通过 DataStax Enterprise)。 DSE 4.7 在生产中,但 4.8 在开发中。

Java 7 个在生产中,Java 7/8 个在开发中。

我们需要几个 DataFrame 转换,我们认为通过 Spark SQLContext 针对内存中的 DataFrame 编写一个 UDF 就可以完成这项工作。其中主要的是:

  1. 我们数据的每个文本值都带有前缀和后缀“.即“一些数据”这很烦人,所以我们想清理每一个。
  2. 我们想添加一个包含由许多其他列组成的散列键的列。

我们的代码如下。这在没有在 sqlContext 中包含 UDF 调用的情况下运行良好,但是一旦添加它们,我们就会收到“任务不可序列化”错误

线程异常 "main" org.apache.spark.SparkException: 任务不可序列化

我试过将“implements Serializable”作为这个(以及许多其他 classes)的基础 class,这将错误 [​​=40=] 更改为下一个错误链,但是这会导致异常失败 class 不可序列化……这可能意味着我正朝着错误的方向前进。

我也尝试过将 UDF 实现为 lambda,这也会导致同样的错误。

如果有人能指出我做错了什么,将不胜感激!

public class entities implements Serializable{
    private spark_context m_spx = null;
    private DataFrame m_entities = null;
    private String m_timekey = null;

    public entities(spark_context _spx, String _timekey){
        m_spx = _spx;
        m_timekey = _timekey;
    }


    public DataFrame get_dimension(){
        if(m_entities == null) {

            DataFrame df = m_spx.get_flat_data(m_timekey).select("event", "url");

            //UDF to generate hashed ids
            UDF2 get_hashed_id = new UDF2<String, String, String>() {
                public String call(String o, String o2) throws Exception {
                    return o.concat(o2);
                }
            };


            //UDF to clean the " from strings
            UDF1 clean_string = new UDF1<String, String>() {
                public String call(String o) throws Exception {
                    return o.replace("\"","");
                }
            };


            //Get the Spark SQL Context from SC.
            SQLContext sqlContext = new SQLContext(m_spx.sc());


            //Register the UDFs
            sqlContext.udf().register("getid", get_hashed_id, DataTypes.StringType);
            sqlContext.udf().register("clean_string", clean_string, DataTypes.StringType);


            //Register the DF as a table.
            sqlContext.registerDataFrameAsTable(df, "entities");
            m_entities = sqlContext.sql("SELECT getid(event, url) as event_key, clean_string(event) as event_cleaned, clean_string(url) as url_cleaned FROM entities");
        }

        return m_entities;
    }
}

你的 entities class 包含一个 SparkContext 成员 - 所以它不能序列化(SparkContexts 在国际上是不可序列化的,你不应该序列化它们)。

由于 entities 不可序列化,因此其中任何一个 非静态 方法/成员/匿名内部 class 也不可序列化(因为他们会尝试序列化持有他们的 entities 实例)。

在这种情况下,最好的解决方法是将匿名 UDF 提取到 class:

static 成员中
private final static UDF2 get_hashed_id = new UDF2<String, String, String>() {
   public String call(String o, String o2) throws Exception {
       return o.concat(o2);
   }
};

private final static UDF1 clean_string = new UDF1<String, String>() {
   public String call(String o) throws Exception {
       return o.replace("\"","");
   }
};

然后你就可以在get_dimension中使用它们了。