spark count&每列值的百分比异常处理和加载到hivedb

fkvaft9z  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(400)

在下面的scala spark代码中,我需要找到不同列的值的计数及其百分比。为此,我需要使用 withColumn 方法到每一列 date , usage , payment , dateFinal , usageFinal , paymentFinal .
对于我需要使用的每一个计算 withColumn 求和和和。有没有什么方法可以让我不用写,

.withColumn("SUM", sum("count").over() ).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction")

每一次?例如,您可以在下面的代码中看到。

var dateFinalDF = dateFinal.toDF(DateColumn).groupBy(DateColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100   ).drop("fraction")  

var usageFinalDF = usageFinal.toDF(UsageColumn).groupBy(UsageColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100   ).drop("fraction")  

var paymentFinalDF = paymentFinal.toDF(PaymentColumn).groupBy(PaymentColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100).drop("fraction")

现在我的代码是下面的一个,所以你能帮助我们为不同的列添加条件,如日期,用法等(例如在代码中,我们获取包含日期的列,而不是我们添加计数和其他我们想要的条件)现在我们想要的是动态的,所有的列名都应该进入一个yml文件中,并且必须从该文件中读取这些名称,我如何才能实现这一点可以得到任何人的帮助和阅读yml文件后我将如何修改我的代码请帮助。

object latest

{

 def main(args: Array[String])

  {

  var fileList = new ListBuffer[String]()
  var dateList = new ListBuffer[String]()
  var fileL = new ListBuffer[String]()

  var fileL1 = new ListBuffer[String]()

  val sparkConf = new SparkConf().setMaster("local[4]").setAppName("hbase sql")
  val sc = new SparkContext(sparkConf)
  val spark1 = SparkSession.builder().config(sc.getConf).getOrCreate()
  val sqlContext = spark1.sqlContext

   import spark1.implicits._

    def f1(number: Double)=
     { 
     "%.2f".format(number).toDouble
     }
    val udfFunc = udf(f1 _)   

    def getCountPercent(df: DataFrame): DataFrame =

    {
  df.withColumn("SUM", sum("count").over() )
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100 )
    .withColumn("number", udfFunc(col("Percent")))
    .drop("Percent")
    .drop("fraction")

    }

   def occurenceCount(df: DataFrame,column: String)    
   {

    var usageFinalDF = df.groupBy(column).count.transform(getCountPercent)           

   for (u <- usageFinalDF.collect())
 {
         fileList += column + '~' + u.mkString("~")
  }
}

     val headerCSV=spark1.sqlContext.read.format("CSV").option("header","true").option("delimiter", """|""").load("C:\\Users\\ayushgup\\Downloads\\Header3.csv")

    val columns = headerCSV.columns

  val data =   spark1.sqlContext.read.format("CSV").option("delimiter", """|""").load("C:/Users/ayushgup/Downloads/home_data_usage_2018122723_1372673.csv").toDF(columns:_*)

 for (coll <- columns.toList) 
    {

  if (coll.toLowerCase().contains("date")) 
    {

    for (datesss <- data.select(coll).collect()) 
    {
      dateList += datesss.toString().slice(1, 8)

    }

    var dateFinalDF = dateList.toList.toDF(coll)

    occurenceCount(dateFinalDF,coll)

     } 
    else if (coll.toLowerCase().contains("usage")) 
 {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1026, "<=1gb").when(col(coll) > 1026 && col(coll) < 5130, "1-5gb")
      .when(col(coll) > 5130 && col(coll) < 10260, "5-10gb")
      .when(col(coll) > 10260 && col(coll) < 20520, "10-20gb")
      .when(col(coll) > 20520, ">20gb")
      .otherwise(0)).toDF(coll)

      occurenceCount(r,coll)

  } 
  else if (coll.toLowerCase().contains("paymentamount")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0)).toDF(coll)

    occurenceCount(r,coll)

  } 
   else if (coll.toLowerCase().contains("accounttenure")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) > 1000000 && col(coll) < 5000000, "1-5m").when(col(coll) > 5000000 && col(coll) < 11000000, "5-11m")
      .when(col(coll) > 12000000 && col(coll) < 23000000, "12-23m")
      .when(col(coll) > 24000000 && col(coll) < 35000000, "24-35m")
      .when(col(coll) > 36000000, ">36m")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)
  } 
  else if (coll.toLowerCase().equals("arpu")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)

  } 
  else if (coll.equals("DisputeAmount") || coll.equals("ticketsAmount")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) > 0, ">0")
      .otherwise(1)).toDF(coll)

   occurenceCount(r,coll)

  } 
  else if (coll.equals("serviceOrdersCreatedLast90Days"))
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) === 1, "1")
      .when(col(coll) === 2, "2")
      .when(col(coll) === 3, "3")
      .when(col(coll) > 3, ">3"))
      .toDF(coll)

   occurenceCount(r,coll)
  } 
  else 
  {

    import spark1.implicits._

   val actData1 = data.groupBy(coll).count().transform(getCountPercent) 

    occurenceCount(actData1,coll)

  }
   }

val f = fileList.toList
for (flist <- f) 
   {

   fileL += flist.replaceAll("[\\[\\]]", "")

   }

   var ff = fileL.toDF()

   var df1: DataFrame = ff.selectExpr("split(value, '~')[0] as 
     Attribute", "split(value, '~')[1] as Value","split(value, '~')[2] as 
     Count","split(value, '~')[3] as Sum","split(value, '~')[4] as 
    Percent");

   }

   }
xlpyo6sf

xlpyo6sf1#

另一种方式。。。scala集合排序-zip/Map样式:-)

scala> val df = Seq((10,20,30),(15,25,35)).toDF("date", "usage", "payment")
df: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]

scala> df.show(false)
+----+-----+-------+
|date|usage|payment|
+----+-----+-------+
|10  |20   |30     |
|15  |25   |35     |
+----+-----+-------+

scala> df.columns
res75: Array[String] = Array(date, usage, payment)

scala> var df2,df3,df4 = df
df2: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]
df3: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]
df4: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]

scala> val arr_all = Array(df2,df3,df4).zip(df.columns).map( d => d._1.groupBy(d._2).count.withColumn("sum",sum('count).over()).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction") )
arr_all: Array[org.apache.spark.sql.DataFrame] = Array([date: int, count: bigint ... 2 more fields], [usage: int, count: bigint ... 2 more fields], [payment: int, count: bigint ... 2 more fields])

scala> val Array(dateFinalDF,usageFinalDF,paymentFinalDF) = arr_all
dateFinalDF: org.apache.spark.sql.DataFrame = [date: int, count: bigint ... 2 more fields]
usageFinalDF: org.apache.spark.sql.DataFrame = [usage: int, count: bigint ... 2 more fields]
paymentFinalDF: org.apache.spark.sql.DataFrame = [payment: int, count: bigint ... 2 more fields]

scala> dateFinalDF.show(false)
2019-01-25 04:10:10 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+-----+---+-------+
|date|count|sum|Percent|
+----+-----+---+-------+
|15  |1    |2  |50.0   |
|10  |1    |2  |50.0   |
+----+-----+---+-------+

scala> usageFinalDF.show(false)
2019-01-25 04:10:20 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-----+-----+---+-------+
|usage|count|sum|Percent|
+-----+-----+---+-------+
|20   |1    |2  |50.0   |
|25   |1    |2  |50.0   |
+-----+-----+---+-------+

scala> paymentFinalDF.show(false)
2019-01-25 04:10:50 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------+-----+---+-------+
|payment|count|sum|Percent|
+-------+-----+---+-------+
|35     |1    |2  |50.0   |
|30     |1    |2  |50.0   |
+-------+-----+---+-------+

scala>

请注意,我已经分解并包括 var (df2,df3,df4) = df 这样就很容易遵循这些步骤。
它们都可以这样组合。

scala> val Array(dateFinalDF,usageFinalDF,paymentFinalDF) = Array(df,df,df).zip(df.columns).map( d => d._1.groupBy(d._2).count.withColumn("sum",sum('count).over()).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction") )
dateFinalDF: org.apache.spark.sql.DataFrame = [date: int, count: bigint ... 2 more fields]
usageFinalDF: org.apache.spark.sql.DataFrame = [usage: int, count: bigint ... 2 more fields]
paymentFinalDF: org.apache.spark.sql.DataFrame = [payment: int, count: bigint ... 2 more fields]

scala>
hsgswve4

hsgswve42#

您可以封装所有 .withColumn() 函数中的操作,该函数返回 DataFrame 应用所有操作后。

def getCountPercent(df: DataFrame): DataFrame = {
  df.withColumn("SUM", sum("count").over() )
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100 )
    .drop("fraction")
}

用法:
使用 .transform() 要应用该函数:

var dateFinalDF = dateFinal.toDF(DateColumn).groupBy(DateColumn).count.transform(getCountPercent)
var usageFinalDF = usageFinal.toDF(UsageColumn).groupBy(UsageColumn).count.transform(getCountPercent)

相关问题