这章主要是关于 Spark的SQL操作,如何把Spark与HIve连接起来,可以参考下面的文字**链接:**Spark SQL 操作 MySQL数据库和 Hive数据仓库
接着Hive时候的操作,继续。
要启动,hadoop集群、hive服务、spark。
/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh
hive --service metastore
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh
进入数据库,导入sql包,定义变量存储sql(" ")的结果,分组groupby(订单),再count统计(),show显示出来。
import spark.sql
sql("use badou")
val orders=sql("select * from orders")
val products=sql("select * from products")
val priors=sql("select * from priors")
priors.show(10) // 查看有order_id中product_id
--方式一:
priors.groupBy("product_id").count().show
--方式二:
val proCnt = priors.groupBy("product_id").count()
proCnt.show(10)
show 默认显示20条数据
show(10):显示指定的条数
show(1,false) 显示的记录数 和针对字符过长进行格式化显示
cache方法:加载到内存
val proCnt = priors.groupBy("product_id").count().cache //没有执行,只是加载到内存中
proCnt.show(10) //第一次运行会慢
proCnt.show(100) //再次运行就直接内存中读取非常快
proCnt.unpersist //在内存中直接移除
场景:当一个商品被重复购买,重复购买的比率越高(这类商品可以理解为消耗品,抽纸,洗发水等等),那下一次购买的可能性很高。
预测:购买这些商品的用户,下一次最容易购买哪些商品。
要求:1、对orders的col列:"eval_set"=="test"
过滤,输出结果。
1、先把orders存到内存,这样查找时候省时间
orders.cache
orders.show(10) //执行cache操作
【注意】:进行条件:等于是 “===”
--1、filter过滤:
orders.filter(col("eval_set")==="test").show(5)
--2、where过滤:
orders.where(col("eval_set")==="test").show(5)
对上面的结果再进行过滤,只显示周二的结果:filter(col("order_dow")==="1")
--两个过滤:where+filter
orders.where(col("eval_set")==="test").filter(col("order_dow")===1).show(10)
再次购买的列在priors表里
val priors=sql("select * from priors")
priors.cache
priors.show(10)
可以发现:reordered=1,代表重新购买。统计商品重新购买的次数
限制列读取 priors表
priors.select(col("product_id"),col("reordered")).show(5)
方式一:简陋版
priors.filter(col("reordered")==="1").groupBy("product_id").count().show(10)
方式二:
priors.selectExpr("product_id","cast(reordered as int)").filter(col("reordered")===1).groupBy("product_id").sum().show(10)
方式三:
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").sum("reordered").show(10)
方式四:理想版
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered")).show(10)
方式五:我们需求版
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)
我们发现结果的列显示sum(reordered),avg(reordered) 我们想重命名一下,用withColumnRenamed("old_name","new_name")
进行重命名。
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).withColumnRenamed("sum(reordered)","sum_re").withColumnRenamed("avg(reordered)","avg_re")show(5)
sum(重新购买的商品) / count (全部商品)
1、重复购买的商品量
val productSumRe = priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"))
2、总的商品量
val proCnt = priors.groupBy("product_id").count()
方式一: scala
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)
方式二:SQL
--join连接表,表1.join(表2, "表都有的列名")
val jCnt = proCnt.join(productSumRe, "product_id")
jCnt.show(5)
jCnt.selectExpr("*", "sum_re/count as mean_re").show(5)
--------------------*******-------------------
方式三:udf:User Defined Function,用户自定义函数。
withColumn表示增加一列,withColumn("new_col","运算的结果")
udf((x_1:,x_2)=>)
import org.apache.spark.sql.functions._
val avg_udf = udf((sm:Long,cnt:Long)=>sm.toDouble/cnt.toDouble) // 实现sum/count
jCnt.withColumn("mean_re", avg_udf(col("sum_re"),col("count"))).show(5)
--对avg_udf格式化:sum_re/count 结果给 mean_re。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44775255/article/details/121405932
内容来源于网络,如有侵权,请联系作者删除!