在apachehadoop上运行具有本地性质的spark查询时出现数据本地性问题

pw136qt2  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(358)

我有一个hadoop集群,它使用apachespark查询保存在hadoop上的parquet文件。例如,当我使用以下pyspark代码在Parquet文件中查找单词时:

df = spark.read.parquet("hdfs://test/parquets/*")
df.filter(df['word'] == "jhon").show()

运行完这段代码后,我转到spark应用程序ui的“阶段”选项卡。我看到地方一级的夏天开始了 Any. 相反,由于此查询的性质,它必须在本地和服务器上运行 NODE_LOCAL 至少是地方级别。当我在运行这个时检查集群的网络io时,我发现这个查询使用网络(当查询运行时网络io增加)。这种情况的奇怪之处在于spark ui的shuffle部分中显示的数字是最小的。
在apachespark邮件列表中russellspitzer的帮助下,我运行了以下代码来查找每个分区的首选位置。这段代码的结果使我离解决这个问题又近了一步。我发现首选位置是ip形式的,而不是主机名,但是spark使用执行者的ip来匹配首选位置并实现数据位置。

scala> def getRootRdd( rdd:RDD[_] ): RDD[_]  = { if(rdd.dependencies.size == 0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]

scala> val rdd = spark.read.parquet("hdfs://test/parquets/*").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at <console>:24

scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at <console>:24

scala> scan.partitions.map(scan.preferredLocations)
res12: Array[Seq[String]] = Array(WrappedArray(datanode-1, datanode-2, datanode-3), WrappedArray(datanode-2, datanode-3, datanode-4), WrappedArray(datanode-3, datanode-4, datanode-5),...

现在,我尝试找到方法让spark首先解析主机名,然后将它们与执行器的ip相匹配。有什么建议吗?

zbdgwd5y

zbdgwd5y1#

这个问题的产生是因为spark在hadoop中对于分区的首选位置是datanode hostname,但是spark worker通过ip注册到spark master。spark正在尝试用本地分区绘制要在执行器上运行的任务。因为执行器Map到IP,分区Map到主机名,所以调度程序无法将IP与主机名匹配,并且任务总是在“任意”位置级别上运行。要解决这一问题,我们必须用适当的方法来解决 -h [hostname] 旗帜。这样,工人在主机上注册的主机名就代替了ip,解决了这个问题。

相关问题