Spark 无法计算表达式:window 表达式的滞后
Spark Cannot evaluate expression: lag of a window expression
我正在尝试对来自 cassandra table 的数据帧执行大量操作,然后将其保存在另一个 table 中。其中一项操作如下:
val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))
当我 运行 我的工作时,我收到一个异常,提示无法评估 lag
操作..
2018-10-08 12:02:22 INFO Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
2018-10-08 12:02:22 INFO CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
2018-10-08 12:02:23 INFO CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
2018-10-08 12:02:23 INFO CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun.apply(conditionalExpressions.scala:202)
at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun.apply(conditionalExpressions.scala:201)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
at org.apache.spark.sql.execution.ProjectExec$$anonfun.apply(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.ProjectExec$$anonfun.apply(basicPhysicalOperators.scala:60)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
at com.test.TestFromCassandra.main(TestFromCassandra.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-10-08 12:02:31 INFO CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster
TestFromCassandra
文件的第 130 行是 save()
函数的调用。我在 Whosebug 上没有发现任何类似的问题..
有人知道我为什么会遇到这个异常吗? lag
函数对滚动 sum
函数有任何限制吗?
编辑:
我在 Spark 的 Jira 上找到了 a similar issue。在引用 window
函数之后,filter
函数似乎存在错误,并且由于 cassandra 连接器在之前过滤主键成员的数据帧(使用 isnotnull
函数)保存它,这就是可能导致异常的原因。
有没有办法通过避免这个错误来执行这个操作,但不使用聚合函数?或者有人知道如何修复这个错误吗?
编辑 2:
我还尝试使用 foreach
编写器和连接器 withSessionDo
函数来存储我的数据框,但我仍然遇到同样的异常。没有人遇到过这个问题吗?
编辑 3:
我找到了另一种方法来实现我想要的操作:
val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))
问题不是由过滤器引起的。似乎无法在 window 表达式上使用滞后函数。
我看到了同样的错误。尽管此问题有解决方法,但 spark 应该可以解决此问题。我相信任何 window 函数都会遇到这个问题,而不仅仅是 LAG。我相信的原因是 spark 尝试在过滤器上执行代码生成,但 window 函数不可代码生成。解决方法是使用此 window 表达式创建一个列并在过滤器中使用该列。
我 运行 遇到了同样的问题,然后我注意到你在 lag 中使用了 over 函数(和我一样)。我改成了这样:
df.withColumn("lag1", lag(sum(col("temp1")), 2, 0).over(lagWindow))
我正在尝试对来自 cassandra table 的数据帧执行大量操作,然后将其保存在另一个 table 中。其中一项操作如下:
val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))
当我 运行 我的工作时,我收到一个异常,提示无法评估 lag
操作..
2018-10-08 12:02:22 INFO Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
2018-10-08 12:02:22 INFO CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
2018-10-08 12:02:23 INFO CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
2018-10-08 12:02:23 INFO CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun.apply(conditionalExpressions.scala:202)
at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun.apply(conditionalExpressions.scala:201)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
at org.apache.spark.sql.execution.ProjectExec$$anonfun.apply(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.ProjectExec$$anonfun.apply(basicPhysicalOperators.scala:60)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
at com.test.TestFromCassandra.main(TestFromCassandra.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-10-08 12:02:31 INFO CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster
TestFromCassandra
文件的第 130 行是 save()
函数的调用。我在 Whosebug 上没有发现任何类似的问题..
有人知道我为什么会遇到这个异常吗? lag
函数对滚动 sum
函数有任何限制吗?
编辑:
我在 Spark 的 Jira 上找到了 a similar issue。在引用 window
函数之后,filter
函数似乎存在错误,并且由于 cassandra 连接器在之前过滤主键成员的数据帧(使用 isnotnull
函数)保存它,这就是可能导致异常的原因。
有没有办法通过避免这个错误来执行这个操作,但不使用聚合函数?或者有人知道如何修复这个错误吗?
编辑 2:
我还尝试使用 foreach
编写器和连接器 withSessionDo
函数来存储我的数据框,但我仍然遇到同样的异常。没有人遇到过这个问题吗?
编辑 3: 我找到了另一种方法来实现我想要的操作:
val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))
问题不是由过滤器引起的。似乎无法在 window 表达式上使用滞后函数。
我看到了同样的错误。尽管此问题有解决方法,但 spark 应该可以解决此问题。我相信任何 window 函数都会遇到这个问题,而不仅仅是 LAG。我相信的原因是 spark 尝试在过滤器上执行代码生成,但 window 函数不可代码生成。解决方法是使用此 window 表达式创建一个列并在过滤器中使用该列。
我 运行 遇到了同样的问题,然后我注意到你在 lag 中使用了 over 函数(和我一样)。我改成了这样:
df.withColumn("lag1", lag(sum(col("temp1")), 2, 0).over(lagWindow))