Spark:SQL操作 cache、filter、selectExpr、agg、join、udf

x33g5p2x  于2021-11-21 转载在 Spark  
字(4.1k)|赞(0)|评价(0)|浏览(840)

这章主要是关于 Spark的SQL操作,如何把Spark与HIve连接起来,可以参考下面的文字**链接:**Spark SQL 操作 MySQL数据库和 Hive数据仓库

接着Hive时候的操作,继续。
要启动,hadoop集群、hive服务、spark。

/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh
hive --service metastore
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh

1、统计订单中商品的数量

进入数据库,导入sql包,定义变量存储sql(" ")的结果,分组groupby(订单),再count统计(),show显示出来。

import spark.sql
sql("use badou")

val orders=sql("select * from orders")
val products=sql("select * from products")
val priors=sql("select * from priors")

priors.show(10) // 查看有order_id中product_id

--方式一:
priors.groupBy("product_id").count().show
--方式二:
val proCnt = priors.groupBy("product_id").count()
proCnt.show(10)

show 默认显示20条数据
show(10):显示指定的条数
show(1,false) 显示的记录数 和针对字符过长进行格式化显示

cache方法:加载到内存

val proCnt = priors.groupBy("product_id").count().cache  //没有执行,只是加载到内存中
proCnt.show(10)    //第一次运行会慢
proCnt.show(100)  //再次运行就直接内存中读取非常快
proCnt.unpersist //在内存中直接移除

2、统计商品被再次购买(reordered)的数量

场景:当一个商品被重复购买,重复购买的比率越高(这类商品可以理解为消耗品,抽纸,洗发水等等),那下一次购买的可能性很高。

预测:购买这些商品的用户,下一次最容易购买哪些商品。

2.1 filter <==> where 针对集合中的元素进行过滤

要求:1、对orders的col列:"eval_set"=="test"过滤,输出结果。

1、先把orders存到内存,这样查找时候省时间
orders.cache
orders.show(10) //执行cache操作

【注意】:进行条件:等于是 “===”
--1、filter过滤:
orders.filter(col("eval_set")==="test").show(5)
--2、where过滤:
orders.where(col("eval_set")==="test").show(5)

对上面的结果再进行过滤,只显示周二的结果:filter(col("order_dow")==="1")

--两个过滤:where+filter
orders.where(col("eval_set")==="test").filter(col("order_dow")===1).show(10)

再次购买的列在priors表里

val priors=sql("select * from priors")
priors.cache
priors.show(10)

可以发现:reordered=1,代表重新购买。统计商品重新购买的次数

  • select: 进行列的方式处理 ;
  • selectExpr: 处理字符串表达式,直接写SQL语句;
  • agg:一般搭配groupBy这种聚合函数使用,在一次agg聚合中可以统计多个值:sum,avg,max,min。
限制列读取 priors表
priors.select(col("product_id"),col("reordered")).show(5)

方式一:简陋版
priors.filter(col("reordered")==="1").groupBy("product_id").count().show(10)

方式二:
priors.selectExpr("product_id","cast(reordered as int)").filter(col("reordered")===1).groupBy("product_id").sum().show(10)

方式三:
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").sum("reordered").show(10)

方式四:理想版
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered")).show(10)

方式五:我们需求版
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)

我们发现结果的列显示sum(reordered),avg(reordered) 我们想重命名一下,用withColumnRenamed("old_name","new_name") 进行重命名。

priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).withColumnRenamed("sum(reordered)","sum_re").withColumnRenamed("avg(reordered)","avg_re")show(5)

3、统计被重复购买的比率

sum(重新购买的商品) / count (全部商品)

1、重复购买的商品量
val productSumRe = priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"))

2、总的商品量
val proCnt = priors.groupBy("product_id").count()
方式一: scala
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)

方式二:SQL
--join连接表,表1.join(表2, "表都有的列名")

val jCnt = proCnt.join(productSumRe, "product_id")
jCnt.show(5)
jCnt.selectExpr("*", "sum_re/count as mean_re").show(5)

--------------------*******-------------------

方式三:udf:User Defined Function,用户自定义函数。
withColumn表示增加一列,withColumn("new_col","运算的结果")
udf((x_1:,x_2)=>)

import org.apache.spark.sql.functions._
val avg_udf = udf((sm:Long,cnt:Long)=>sm.toDouble/cnt.toDouble) // 实现sum/count

jCnt.withColumn("mean_re", avg_udf(col("sum_re"),col("count"))).show(5)
--对avg_udf格式化:sum_re/count 结果给 mean_re。

相关文章