如何在spark streaming中检查列不包含空值?

kg7wmglp  于 2023-06-24  发布在  Apache
关注(0)|答案(2)|浏览(161)

我正在使用spark streaming从源读取。我试图写一个函数,检查给定的列是否包含空值或不。我试过这个:

boolean checkColumnNotNull(SparkSession spark,Dataset<Row>dataframe,String colName) throws InvalidColumnNameException {
        if(!checkColumnExists(dataframe,colName)){
            throw new InvalidColumnNameException ("column doesn't exist");
        }
        Dataset<Row> containingNulls = dataframe.where(dataframe.col(colName).isNull());
        Row[] helper = (Row[]) containingNulls.take(1);
        if(helper[0]==null){
            return true;
        }
        return false;
    }

然而,当我运行这段代码时,我得到一个错误,说:

Queries with streaming sources must be executed with writeStream.start();

在这条线上

Row[] helper = (Row[]) containingNulls.take(1);

因此,根据错误,我们不能在流数据框架上使用take()函数。我尝试了各种其他方法,如count(),isEmpty()等,但似乎所有这些函数都与批处理一起工作。
那么,是否有任何方法可以在流式数据框架上执行上述操作。任何帮助将不胜感激。谢谢!

e4eetjau

e4eetjau1#

在Spark Streaming中,流 Dataframe 不支持take()、count()和isEmpty()等操作。这是因为流式 Dataframe 会不断更新新数据,并且这些操作的结果会不断变化。
使用isEmpty代替,无需创建helper var:

Dataset<Row> containingNulls = dataframe.filter(dataframe.col(colName).isNull());
return containingNulls.isStreaming() && containingNulls.isEmpty();

如果它是流式的并且为空,则意味着在指定列中没有找到具有空值的行。

eni9jsuy

eni9jsuy2#

我有办法了。spark的foreachBatch函数允许我们将流数据处理成批处理,在这些批处理中,我们可以使用count(),isEmpty()等函数,这些函数适用于非流数据。

相关问题