如何在databricks流中运行scala中的if else语句

r7xajy2e  于 2021-07-14  发布在  Java
关注(0)|答案(2)|浏览(444)

我对scala和databricks流媒体都是新手。我正在将流式事件读入一个Dataframe,我想使用if else语句根据Dataframe是否为空来触发另一个笔记本。下面的简单代码(及其变体)

if(finalDF.isEmpty){ 
  print("0")
}
else{
  print("1")
}

持续导致以下错误

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs

如何将writestream.start()合并到上述代码中?或者,如果Dataframe是由流式事件填充的,我如何评估Dataframe内容并在此基础上执行一个或另一个操作?

4smxwvx5

4smxwvx51#

流df不能是空的或者不是设计的-流是无限的,如果你现在没有数据,那么你可以在下一秒得到一些新的东西。所以你的代码不起作用。
您可以使用foreachbatch来处理数据的“当前”快照,这样您就可以像处理“正常”的非流式Dataframe一样处理这些快照,但是您可能无法从笔记本内部触发笔记本,因此这两种情况的代码应该在同一个函数中,而不是在不同的笔记本中。

1sbrub3j

1sbrub3j2#

我测试了这段代码,它是一种引入if-else并根据事件内容决定操作的方法。

df.writeStream.foreachBatch((df: org.apache.spark.sql.DataFrame, batchID: Long) => myfunc(df)).start()

def myfunc(df: org.apache.spark.sql.DataFrame){
    val test1= df.filter($"col" === "test1")
    val test2= df.filter($"col" === "test2")
    if(test1.count()>0){ 
        dbutils.notebook.run("some_notebook", 60)
    }
    if(test2.count()>0){
        dbutils.notebook.run("another_notebook", 60)
    }
}

相关问题