from pyspark.sql import functions as f
class Outlier():
def __init__(self, df):
self.df = df
def _calculate_bounds(self):
bounds = {
c: dict(
zip(["q1", "q3"], self.df.approxQuantile(c, [0.25, 0.75], 0))
)
for c, d in zip(self.df.columns, self.df.dtypes) if d[1] in ["bigint", "double"]
}
for c in bounds:
iqr = bounds[c]['q3'] - bounds[c]['q1']
bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)
return bounds
def _flag_outliers_df(self):
bounds = self._calculate_bounds()
outliers_col = [
f.when(
~f.col(c).between(bounds[c]['min'], bounds[c]['max']),
f.col(c)
).alias(c + '_outlier')
for c in bounds]
return self.df.select(*outliers_col)
def show_outliers(self):
outlier_df = self._flag_outliers_df()
for outlier in outlier_df.columns:
outlier_df.select(outlier).filter(f.col(outlier).isNotNull()).show()
然后我使用:outlier(df).show_outliers()查看异常值,如下图所示:一列的相应异常值第二列的相应异常值
如何计算每列的异常值数?换句话说,如何计算pres\u输出和temp\u输出中的值数?
暂无答案!
目前还没有任何答案,快来回答吧!