我的要求是使用pyspark从hdfs读取数据,只过滤所需的列,删除空值,然后将处理后的数据写回hdfs。一旦这些步骤完成,我们需要从hdfs中删除原始脏数据。这是我的每个操作的脚本。
导入库和依赖项
# Spark Version = > version 2.4.0-cdh6.3.1
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate()
import pyspark.sql.functions as F
从hdfs读取数据
df_load_1 = sparkSession.read.csv('hdfs:///cdrs/file_path/*.csv', sep = ";")
仅选择所需的列
col = [ '_c0', '_c1', '_c2', '_c3', '_c5', '_c7', '_c8', '_c9', '_c10', '_C11', '_c12', '_c13', '_c22', '_C32', '_c34', '_c38', '_c40',
'_c43', '_c46', '_c47', '_c50', '_c52', '_c53', '_c54', '_c56', '_c57', '_c59', '_c62', '_c63','_c77', '_c81','_c83']
df1=df_load_1.select(*[col])
检查空值,我们有任何删除它们
df_agg_1 = df1.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns])
df_agg_1.show()
df1 = df1.na.drop()
将预处理的数据写入hdfs,同一个集群但不同的目录
df1.write.csv("hdfs://nm/pyspark_cleaned_data/py_in_gateway.csv")
从hdfs中删除原始数据
def delete_path(spark , path):
sc = spark.sparkContext
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
通过传递hdfs绝对路径在下面执行
delete_path(spark , '/cdrs//cdrs/file_path/')
pyspark和hdfs命令
我能成功地完成所有的手术 pyspark
提示。
现在我想开发应用程序和提交工作使用Spark提交
例如
spark-submit --master yarn --deploy-mode client project.py for local
spark-submit --master yarn --deploy-mode cluster project.py for cluster
在这一点上,我卡住了,我不知道什么参数,我应该通过在Spark提交到位纱。我不确定是否只是简单地复制和粘贴所有上述命令并使 .py
文件会有帮助的。我对这项技术很陌生。
1条答案
按热度按时间k5ifujac1#
基本上你的spark工作将在集群上运行。spark2.4.4支持yarn、kubernetes、mesos和spark独立集群doc。
--master yarn
指定要将spark作业提交到一个簇。--deploy-mode
指定是在工作节点(群集)上部署驱动程序,还是作为外部客户端(客户端)本地部署驱动程序(默认:客户端)你可以在提交spark作业时提供其他参数,比如
--driver-memory
,--executor-memory
,--num-executors
请在这里检查。