我正在使用spark streaming从源读取。我试图写一个函数,检查给定的列是否包含空值或不。我试过这个:
boolean checkColumnNotNull(SparkSession spark,Dataset<Row>dataframe,String colName) throws InvalidColumnNameException {
if(!checkColumnExists(dataframe,colName)){
throw new InvalidColumnNameException ("column doesn't exist");
}
Dataset<Row> containingNulls = dataframe.where(dataframe.col(colName).isNull());
Row[] helper = (Row[]) containingNulls.take(1);
if(helper[0]==null){
return true;
}
return false;
}
然而,当我运行这段代码时,我得到一个错误,说:
Queries with streaming sources must be executed with writeStream.start();
在这条线上
Row[] helper = (Row[]) containingNulls.take(1);
因此,根据错误,我们不能在流数据框架上使用take()函数。我尝试了各种其他方法,如count(),isEmpty()等,但似乎所有这些函数都与批处理一起工作。
那么,是否有任何方法可以在流式数据框架上执行上述操作。任何帮助将不胜感激。谢谢!
2条答案
按热度按时间e4eetjau1#
在Spark Streaming中,流 Dataframe 不支持take()、count()和isEmpty()等操作。这是因为流式 Dataframe 会不断更新新数据,并且这些操作的结果会不断变化。
使用isEmpty代替,无需创建helper var:
如果它是流式的并且为空,则意味着在指定列中没有找到具有空值的行。
eni9jsuy2#
我有办法了。spark的foreachBatch函数允许我们将流数据处理成批处理,在这些批处理中,我们可以使用count(),isEmpty()等函数,这些函数适用于非流数据。