学习 Spark:哪里不起作用的示例

Learning Spark: Example with where doesn't work

我正在尝试执行 Learning Spark 一书中的示例。

where表达式中有这样一种使用列的形式:

val fewFireDF = fireDF
    .select("IncidentNumber", "AvailableDtTm", "CallType")
    .where($"CallType" =!= "Medical Incident")

但是 IntelliJ Idea 看不懂$"CallType"。它看起来像一个字符串。

这些变体效果很好:

.where(col("CallType") =!= "Medical Incident")
.where("CallType != 'Medical Incident'")

更新 看来我没说清楚我的问题。

这是我的代码:

package org.example.chapter3

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.dsl.expressions.{DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.types.{BooleanType, FloatType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._

object DepartmentCalls extends App {
  val spark = SparkSession
    .builder
    .appName("DepartmentCalls")
    .getOrCreate()

  if (args.length < 1) {
    println("usage DepartmentCalls <file path to fire_incidents.csv")
    System.exit(1)
  }

  val schema = StructType(
    Array(
      StructField("CallNumber", IntegerType),
      StructField("UnitID", StringType),
      StructField("IncidentNumber", IntegerType),
      StructField("CallType", StringType),
      StructField("CallDate", StringType),
      StructField("WatchDate", StringType),
      StructField("CallFinalDisposition", StringType),
      StructField("AvailableDtTm", StringType),
      StructField("Address", StringType),
      StructField("City", StringType),
      StructField("Zipcode", IntegerType),
      StructField("Battalion", StringType),
      StructField("StationArea", StringType),
      StructField("Box", StringType),
      StructField("OriginalPriority", StringType),
      StructField("Priority", StringType),
      StructField("FinalPriority", IntegerType),
      StructField("ALSUnit", BooleanType),
      StructField("CallTypeGroup", StringType),
      StructField("NumAlarms", IntegerType),
      StructField("UnitType", StringType),
      StructField("UnitSequenceInCallDispatch", IntegerType),
      StructField("FirePreventionDistrict", StringType),
      StructField("SupervisorDistrict", StringType),
      StructField("Neighborhood", StringType),
      StructField("Location", StringType),
      StructField("RowID", StringType),
      StructField("Delay", FloatType)
    )
  )

  // Read the file using the CSV DataFrameReader
  val sfFireFile= args(0)
  val fireDF = spark.read.schema(schema)
    .option("header", "true")
    .csv(sfFireFile)

  println(fireDF.count())

  val fewFireDF = fireDF
    .select("IncidentNumber", "AvailableDtTm", "CallType")
    .where($"CallType" =!= "Medical Incident")

  fewFireDF.show(5, false)

}

我有下一个错误:

  1. 无法解析重载方法'where'
  2. 类型不匹配。必需的表达式,找到的字符串 - 在“医疗事故”之后

当我尝试编译代码时出现下一个错误:

[error] /Users/xxxxxxx/Workspace/Learning/Spark/learning-spark/src/main/scala/org/example/chapter3/DepartmentCalls.scala:62:28: type mismatch; [error] found : String("Medical Incident") [error] required: org.apache.spark.sql.catalyst.expressions.Expression [error] .where($"CallType" =!= "Medical Incident") [error]
^ [error] one error found [error] (Compile / compileIncremental) Compilation failed

您可能缺少调用站点范围内的导入。 $<column name> 快捷方式通常通过调用 import sparksession.implicits._ 引入。如果您启用了 'optimize imports',Intellij 通常会删除此导入,因为它无法识别它正在使用中。