四分之一极差法剔除Spark异常值产生误差

6tqwzwtp  于 2023-03-23  发布在  Apache
关注(0)|答案(2)|浏览(152)

我有以下递归函数,它使用InterQuartileRange方法确定离群值:

def interQuartileRangeFiltering(df: DataFrame): DataFrame = {
    @scala.annotation.tailrec
    def inner(cols: List[String], acc: DataFrame): DataFrame = cols match {
      case Nil          => acc
      case column :: xs =>
        val quantiles = acc.stat.approxQuantile(column, Array(0.25, 0.75), 0.0) // TODO: values should come from config
        println(s"$column ${quantiles.size}")
        val q1 = quantiles(0)
        val q3 = quantiles(1)
        val iqr = q1 - q3
        val lowerRange = q1 - 1.5 * iqr
        val upperRange = q3 + 1.5 * iqr
        val filtered = acc.filter(s"$column < $lowerRange or $column > $upperRange")
        inner(xs, filtered)
    }
    inner(df.columns.toList, df)
}

val outlierDF = interQuartileRangeFiltering(incomingDF)

所以基本上我所做的就是递归地迭代列并消除异常值。奇怪的是,它会导致ArrayIndexOutOfBounds Exception并打印以下内容:

housing_median_age 2
inland 2
island 2
population 2
total_bedrooms 2
near_bay 2
near_ocean 2
median_house_value 0
java.lang.ArrayIndexOutOfBoundsException: 0
  at inner$1(<console>:75)
  at interQuartileRangeFiltering(<console>:83)
  ... 54 elided

我的方法有什么问题吗?

3zwjbxry

3zwjbxry1#

def checkOutliersNum(columnName:String, df:DataFrame): DataFrame ={
    val total = df.count()
    val quantiles = df.stat.approxQuantile(columnName,Array(0.25,0.75),0.0)
    val q1 = quantiles(0)
    val q3 = quantiles(1)
    val iqr = q3 - q1
    val dfOutliers = df.select(columnName).filter(col(columnName) < (q1-1.5*iqr) || col(columnName) > (q3+1.5*iqr))
    val regsCount = dfOutliers.count().toInt
    val percentage = (regsCount.toFloat/total.toFloat)*100
    //Removing outliers if percentage between 1-0%
    if(percentage < 1 && percentage > 0){
        df.select(columnName).filter(col(columnName) >= (q1-1.5*iqr) && col(columnName) <= (q3+1.5*iqr))
    }else{
        df
    }               
}

//Each column must follow a normal distribution.
val columnsInNormalDistr = Array("column_A","column_B","column_N")
for(columnName <- columnsInNormalDistr) {
    //var df = ...
    df = checkOutliersNum(columnName,df)
}
kcrjzv8t

kcrjzv8t2#

以下是我的想法,效果很好:

def outlierEliminator(df: DataFrame, colsToIgnore: List[String])(fn: (String, DataFrame) => (Double, Double)): DataFrame = {

    val ID_COL_NAME = "id"
    val dfWithId = DataFrameUtils.addColumnIndex(spark, df, ID_COL_NAME)
    val dfWithIgnoredCols = dfWithId.drop(colsToIgnore: _*)

    @tailrec
    def inner(
      cols: List[String],
      filterIdSeq: List[Long],
      dfWithId: DataFrame
    ): List[Long] = cols match {
      case Nil          => filterIdSeq
      case column :: xs =>
        if (column == ID_COL_NAME) {
          inner(xs, filterIdSeq, dfWithId)
        } else {
          val (lowerBound, upperBound) = fn(column, dfWithId)
          val filteredIds =
            dfWithId
              .filter(s"$column < $lowerBound or $column > $upperBound")
              .select(col(ID_COL_NAME))
              .map(r => r.getLong(0))
              .collect
              .toList
          inner(xs, filteredIds ++ filterIdSeq, dfWithId)
        }
    }

    val filteredIds = inner(dfWithIgnoredCols.columns.toList, List.empty[Long], dfWithIgnoredCols)
    dfWithId.except(dfWithId.filter($"$ID_COL_NAME".isin(filteredIds: _*)))
  }

相关问题