Spark Scala - 编译错误

Spark Scala - compile errors

我在 scala 中有一个脚本,当我 运行 它在 Zeppelin 中运行良好时,但是当我尝试使用 sbt 编译时,它不起作用。我相信与版本有关,但我无法识别。

那三种方式returns一样的错误:

val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()

返回错误:“值 rdd 不是 Unit 的成员”

val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF

返回错误:“toDF 的值不是 org.apache.spark.rdd.RDD[U] 的成员”

Scala 版本:2.12 SBT 版本:1.3.13

更新: 整个class是: 包导入器

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql._

import org.apache.spark.sql.functions._
import udf.functions._

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column

object BusinessImporter extends Importer{

    def importa(spark: SparkSession, inputDir: String): Unit = {
        
        import spark.implicits._
        val bizDF = spark.read.json(inputDir).cache

        // categories
        val explode_categories = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
        val sort_categories = explode_categories.select(col("categories").as("description"))
                .distinct
                .coalesce(1)
                .orderBy(asc("categories"))
        // Create sequence column
        val windowSpec = Window.orderBy("description")
        val categories_with_sequence = sort_categories.withColumn("id",row_number.over(windowSpec))
        val categories = categories_with_sequence.select("id","description")

        val catDF = categories.write.insertInto("categories")

        // business categories
        //val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
        //val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
        val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()
        val auxbizCatRDD = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
        val bizCatRDD = auxbizCatRDD.select("business_id","categories")
        val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF
        bizCat.write.insertInto("business_category")

        // Business
        val businessDF = bizDF.select("business_id","categories","city","address","latitude","longitude","name","is_open","review_count","stars","state")
        businessDF.coalesce(1).write.insertInto("business")

        // Hours
        val bizHoursDF = bizDF.select("business_id","hours.Sunday","hours.Monday","hours.Tuesday","hours.Wednesday","hours.Thursday","hours.Friday","hours.Saturday")

        val bizHoursDF_structs = bizHoursDF
            .withColumn("Sunday",struct(
            split(col("Sunday"),"-").getItem(0).as("Open"),
            split(col("Sunday"),"-").getItem(1).as("Close")))
            .withColumn("Monday",struct(
            split(col("Monday"),"-").getItem(0).as("Open"),
            split(col("Monday"),"-").getItem(1).as("Close")))
            .withColumn("Tuesday",struct(
            split(col("Tuesday"),"-").getItem(0).as("Open"),
            split(col("Tuesday"),"-").getItem(1).as("Close")))
            .withColumn("Wednesday",struct(
            split(col("Wednesday"),"-").getItem(0).as("Open"),
            split(col("Wednesday"),"-").getItem(1).as("Close")))
            .withColumn("Thursday",struct(
            split(col("Thursday"),"-").getItem(0).as("Open"),
            split(col("Thursday"),"-").getItem(1).as("Close")))
            .withColumn("Friday",struct(
            split(col("Friday"),"-").getItem(0).as("Open"),
            split(col("Friday"),"-").getItem(1).as("Close")))
            .withColumn("Saturday",struct(
            split(col("Saturday"),"-").getItem(0).as("Open"),
            split(col("Saturday"),"-").getItem(1).as("Close")))

        bizHoursDF_structs.coalesce(1).write.insertInto("business_hour")
        
    }

    def singleSpace(col: Column): Column = {
        trim(regexp_replace(col, " +", " "))
    }
}

sbt 文件:

name := "yelp-spark-processor"
version := "1.0"
scalaVersion := "2.12.12"

libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12"  % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-hive_2.12" % "3.0.1"

有人可以告诉我一些错误的方向吗?

非常感谢 沙维

这里的问题是在 scala 这一行 returns 类型 Unit:

val catDF = categories.write.insertInto("categories")

scala 中的 Unit 就像 java 中的 void,它是由 return 没有任何意义的函数编辑的。所以基本上在这一点上 catDF 不是数据框,你不能这样对待它。所以你可能想在后面的行中继续使用 categories 而不是 catDF