如何创建pyspark应用程序

qxsslcnc  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(372)

我的要求是使用pyspark从hdfs读取数据,只过滤所需的列,删除空值,然后将处理后的数据写回hdfs。一旦这些步骤完成,我们需要从hdfs中删除原始脏数据。这是我的每个操作的脚本。
导入库和依赖项


# Spark Version = > version 2.4.0-cdh6.3.1

from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate() 
import pyspark.sql.functions as F

从hdfs读取数据

df_load_1 = sparkSession.read.csv('hdfs:///cdrs/file_path/*.csv', sep = ";")

仅选择所需的列

col = [ '_c0',  '_c1',  '_c2',  '_c3',  '_c5',  '_c7',  '_c8',  '_c9', '_c10', '_C11', '_c12', '_c13', '_c22', '_C32', '_c34', '_c38', '_c40',
   '_c43', '_c46', '_c47', '_c50', '_c52', '_c53', '_c54', '_c56', '_c57', '_c59', '_c62', '_c63','_c77', '_c81','_c83'] 

df1=df_load_1.select(*[col])

检查空值,我们有任何删除它们

df_agg_1 = df1.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns])

df_agg_1.show()

df1 = df1.na.drop()

将预处理的数据写入hdfs,同一个集群但不同的目录

df1.write.csv("hdfs://nm/pyspark_cleaned_data/py_in_gateway.csv")

从hdfs中删除原始数据

def delete_path(spark , path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

通过传递hdfs绝对路径在下面执行

delete_path(spark , '/cdrs//cdrs/file_path/')

pyspark和hdfs命令
我能成功地完成所有的手术 pyspark 提示。
现在我想开发应用程序和提交工作使用Spark提交
例如

spark-submit --master yarn --deploy-mode client project.py for local 

spark-submit --master yarn --deploy-mode cluster project.py for cluster

在这一点上,我卡住了,我不知道什么参数,我应该通过在Spark提交到位纱。我不确定是否只是简单地复制和粘贴所有上述命令并使 .py 文件会有帮助的。我对这项技术很陌生。

k5ifujac

k5ifujac1#

基本上你的spark工作将在集群上运行。spark2.4.4支持yarn、kubernetes、mesos和spark独立集群doc。 --master yarn 指定要将spark作业提交到一个簇。 --deploy-mode 指定是在工作节点(群集)上部署驱动程序,还是作为外部客户端(客户端)本地部署驱动程序(默认:客户端)

spark-submit --master yarn --deploy-mode client project.py for client mode 

spark-submit --master yarn --deploy-mode cluster project.py for cluster mode

spark-submit --master local project.py for local mode

你可以在提交spark作业时提供其他参数,比如 --driver-memory , --executor-memory , --num-executors 请在这里检查。

相关问题