1**、SparkCore Example示例**
在Spark的安装目录下提供了很多的程序示例(有Java,Python,Scala等语言版本),由于使用Scala编程语言在Spark上开发程序比Java语言精简、方便,后续将侧重于使用Scala语言开发Spark程序,因此,大家很有必要去自学下Scala编程语言。Spark自带的example示例如下图所示:
这里简单讲述下Spark自带示例SparkPi(蒙特卡洛求圆周率Pi)的运行,该程序示例的源代码如下图所示:
进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令bin/spark-submit --master spark://hadoop221:7077 --classorg.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 2000,运行结果如下图所示。从输出的log中可以看到Pi is roughly 3.141436235707181这样一行log,表明此次计算得出的圆周率为3.141436235707181。这里再稍稍解释下命令中的各个参数,bin/spark-submit——提交Spark任务使用的命令,--master——指定连接的Master节点,--class——指定运行程序的主类main clas,后面的路径表示程序代码被打包成jar包存放的位置,2000表示此轮计算进行2000次模拟,这个数值越大最终计算得到的圆周率值也会更逼近真实值。
2**、开发Java版WordCount程序**
使用Java语言进行Spark程序开发,首先,需要导入相应的jar包,这些jar包位于Spark的安装目录jars下,完整路径为/root/training/spark-2.1.0-bin-hadoop2.7/jars。这里需要注意的是,运行该WordCount Spark程序有两种模式,一种是本地模式,另一种是集群模式,本地模式就是直接在本地Eclipse中运行,集群模式就是将程序打包成jar包提交到Spark中运行。下面将简单介绍使用这两种模式来运行WordCount Spark程序。
本地模式
使用本地模式运行该WorkCount Spark程序非常简单,同运行普通的Java程序完全一样,这里就不做过多解释。程序代码如下图所示:
运行结果如下图所示,在Spark中,跟MapReduce程序不一样的地方是,Spark程序的输出结果默认不会按字典排序,而MapReduce程序的输出结果默认按字典排序。
集群模式
使用集群模式运行该WordCount Spark程序,稍微要复杂一些,同MapReduce程序一样,需要将程序打包成jar包提交到Spark上运行。程序代码如下图所示:
编写完程序后,将程序导出export为JavaWordCount.jar包,通过远程FTP工具上传到主机hadoop221的/root/temp目录下,然后进入到/root/training/spark-2.1.0-bin-hadoop2.7目录下,运行命令bin/spark-submit --master spark://hadoop221:7077 --classdemo.JavaWordCount /root/temp/JavaWordCount.jarhdfs://hadoop221:9000/input/data.txt hdfs://hadoop221:9000/output/javawc01,提交该WordCount Spark程序到Spark上运行,运行结果如下图所示。
在程序中,同时也将输出结果保存到了HDFS相应的目录下,查看HDFS对应目录及结果保存文件的内容,如下图所示。这里有个问题需要注意,该程序未进行重分区,因此Spark默认进行了2个分区,但奇怪的是第一个分区文件的大小又是0字节。若想要将结果输出到一个分区文件中,该怎么办呢?后文将给出答案。
3**、开发Scala版WordCount程序**
开发Scala版本的WordCount Spark程序,同样需要导入相应的jar包,这跟前面开发Java版本的WordCount Spark程序完全一致,这里不再重复讲述。同运行Java版本的WordCount Spark程序一样,这里也有两种运行模式,即本地模式和集群模式,下面分别进行简单讲解。
本地模式
跟前面一样,本地模式就是在本地Eclipse中直接运行Scala程序,简单方便。运行Scala程序,需要安装相应的Scala集成开发环境,本人这里安装的是Scala IDE,它的使用方式其实就跟普通的Eclipse完全一样,只是适用于Scala 程序的开发。代码如下图所示,可以明显看出,相比于Java版本的程序,Scala版本的程序精简太多太多,但它的可读性稍微要差些,需要对Scala语言比较熟悉阅读起来才能得心应手。
运行结果如下图所示,可以看到,这里的输出结果跟前面Java版本的WordCount Spark程序的输出结果完全一致。
集群模式
使用集群模式运行Scala版本的WordCount Spark程序,跟Java版本完全一样,需要将程序导出export为jar包提交到Spark上执行,程序代码如下图所示:
编写完程序后,将程序导出export为ScalaWordCount.jar包,然后将该jar包上传到主机hadoop221的/root/temp目录下,最后运行命令bin/spark-submit --master spark://hadoop221:7077 --classdemo.ScalaWordCount /root/temp/ScalaWordCount.jar hdfs://192.168.12.221:9000/input/data.txthdfs://192.168.12.221:9000/output/scalawc02即可。待程序执行完成后,查看HDFS指定目录及结果保存的文件内容,如下图所示。这里给出了前文问题(如何将结果输出到一个分区文件中)的答案,在程序中调用repartition(1)即可,从下图可以明显看到,HDFS指定目录下只产生了一个分区文件。
4**、编程案例一:求网站的访问量**
这个编程案例的功能是,分析一份Tomcat Log(即网站的访问日志),最终得出访问量最高的两个网页。首先,简单介绍下测试数据集,文件tomcat_log.txt中包含了30条Tomcat访问日志,内容如下图所示:
第一个字段表示客户端IP地址,第二个字段表示访问时间,第三个字段表示访问的网站资源,第四个字段表示服务器返回的状态码,第五个字段表示此次资源请求的流量。
这个编程案例总体上来说比较简单,目的是练习下SparkCore基础编程的一些知识点,程序的核心在于数据内容的解析上。程序代码如下图所示:
运行结果如下图所示,从输出的log中可以看到,网站访问量最高的两个网页分别是oracle.jsp以及hadoop.jsp,它们均被访问了9次。
5**、编程案例二:创建自定义分区**
这个编程案例的功能是,分析Tomcat Log日志(即网站访问日志),根据被访问的网页名称(即jsp文件名称),将各自的访问日志存放到不同的分区文件中,以实现归类存储(具有相同网页名称的日志,被保存到同一个分区文件中)。这里使用的测试数据集,同样是上面使用的tomcat_log.txt文件,具体数据内容及说明参考前文,这里不再赘述。程序代码如下图所示,这里需要特别强调下,由于该程序是运行在Windows的Eclipse IDE集成开发环境中,System.setProperty("hadoop.home.dir","C:\MyDocuments\winutils")这行代码必不可少,否则程序的运行会报错。winutils.exe是Windows系统上hadoop的调试工具,里面包含Hadoop、Spark所需要的一些基本调试工具类。操作方法是,在相应目录(这里对应的是C:\MyDocuments\winutils)下创建bin文件夹,将winutils.exe工具拷贝到该bin文件夹下即可。
程序执行完后,会自动在C:\MyDocuments\目录下创建partitionDemo文件夹,并将生成的结果保存到该文件夹下,如下图所示:
由于生成的文件较多,就不一一进行查看了,仅查看下前三个文件中的内容。其他文件中保存的输出内容也是类似的,它们各自包含了相同网页名称所对应的所有访问日志。
part-00000文件中的内容如下图所示:
part-00001文件中的内容如下图所示:
part-00002文件中的内容如下图所示:
SparkCore编程实践的内容就简单地讲这么多,关键还是需要自己去动手实践,下期再见。
参考文献:
——《CSDN博客》
——《潭州大数据课程课件》
转自https://mp.weixin.qq.com/s/fMTefYMaxpYzF3XNBdDclw
内容来源于网络,如有侵权,请联系作者删除!