如何在 Databricks 流中 运行 Scala 中的 if else 语句

How to run an if else statement in Scala in Databricks streaming

我是 Scala 和 Databricks 流媒体的新手。我正在将流式事件读入数据帧,我想使用 if-else 语句根据数据帧是否为空来触发不同的笔记本。下面的简单代码(及其变体)

if(finalDF.isEmpty){ 
  print("0")
}
else{
  print("1")
}

持续导致以下错误

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs

如何将 writeStream.start() 合并到上面的代码中?或者,我如何评估数据框内容并基于此采取一个或另一个操作,因为数据框是由流式事件填充到其中的?

流式 DF 不能为空或设计为非空 - 流是无限的,如果您现在没有数据,那么您可以在下一秒获得新的东西。所以你的代码将不起作用。

您可以使用 foreachBatch 处理数据的“当前”快照,您可以像处理“正常”的非流式数据帧一样处理这些数据,但是您 可能无法 从里面触发笔记本,所以两种情况的代码应该在同一个函数中,而不是在不同的笔记本中。

我测试了这段代码,它可以作为一种引入 if-else 并根据事件内容决定操作的方法。

df.writeStream.foreachBatch((df: org.apache.spark.sql.DataFrame, batchID: Long) => myfunc(df)).start()

def myfunc(df: org.apache.spark.sql.DataFrame){
    val test1= df.filter($"col" === "test1")
    val test2= df.filter($"col" === "test2")
    if(test1.count()>0){ 
        dbutils.notebook.run("some_notebook", 60)
    }
    if(test2.count()>0){
        dbutils.notebook.run("another_notebook", 60)
    }
}