2 数据框列值在 where 子句中不起作用

2 dataframe column values not working in where clause

val creation_timestamp = df.groupBy().agg(min($"userCreation_timestamp").alias("ts")).col("ts")

df.filter(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()
or
df.where(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()

当运行上面的代码显示数据时,我得到以下异常:

org.apache.spark.sql.AnalysisException: Resolved attribute(s).

org.apache.spark.sql.AnalysisException: Resolved attribute(s) ts#1658 missing from id#2,userCreation_timestamp#8,firstname#31 in operator !Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658).;;
!Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658)
+- Relation[id#02,userCreation_timestamp#8, 26 more fields] parquet

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:293)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:84)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:172)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:178)
  at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
  at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3306)
  at org.apache.spark.sql.Dataset.filter(Dataset.scala:1463)
  ... 49 elided

df.where(col("userCreation_timestamp").cast("timestamp") >= "2022-03-11 18:36:48").show()

在 where 子句中使用文字值,代码工作正常,但是当使用数据框时,它会失败

您可以先select将min timestamp作为值,然后在where/filter函数中使用这个值。请找到以下工作示例:

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{SparkSession, functions}

object QuestionWhosebug extends App {

  val spark = SparkSession.builder
    .master("local[*]")
    .appName("Sample App")
    .config("spark.sql.shuffle.partitions", "1")
    .getOrCreate()

  import spark.sqlContext.implicits._

  val df = Seq(
    (1, "2022-03-11 18:36:48"),
    (2, "2022-03-11 19:00:00"),
    (3, "2022-03-11 20:00:00")
  ).toDF("id", "userCreation_timestamp")
    .withColumn("ts", col("userCreation_timestamp").cast(TimestampType))

  df.printSchema()

  val creation_timestamp = df
    .select(functions.min("ts"))
    .head().get(0)

  df.where(col("ts") > lit(creation_timestamp).cast(TimestampType))
    .show()
  
}

架构是:

root
 |-- id: integer (nullable = false)
 |-- userCreation_timestamp: string (nullable = true)
 |-- ts: timestamp (nullable = true)

输出:

+---+----------------------+-------------------+
| id|userCreation_timestamp|                 ts|
+---+----------------------+-------------------+
|  2|   2022-03-11 19:00:00|2022-03-11 19:00:00|
|  3|   2022-03-11 20:00:00|2022-03-11 20:00:00|
+---+----------------------+-------------------+

如果您对 Spark 的类似问题感兴趣,请访问我的博客:https://bigdata-etl.com/tag/apache-spark/

您收到此错误是因为您没有在过滤条件中传递 ts 列的值,而是传递列本身。由于 ts 列在 df 数据框中不存在,您会得到一个 AnalysisException: Resolved attribute(s) ts#1658 missing 异常。

如果要传递列的值,则需要检索聚合数据框的第一行,然后检索该行中的时间戳值,最后使用 lit 将其传递给您的条件:

import org.apache.spark.sql.functions.{min, lit, col}

val creation_timestamp = df.agg(min($"userCreation_timestamp")).head().getTimestamp(0)

df.filter(col("userCreation_timestamp").cast("timestamp") >= lit(creation_timestamp)).show()