大家好首先我知道这个线程的存在,任务只在spark中的一个执行器上运行。
然而,这不是我的情况,因为我正在使用 repartition(n)
在我的数据框上。
基本上,我是通过spark从elasticsearch索引中获取数据来加载Dataframe的,如下所示:
spark = SparkSession.builder \
.appName("elastic") \
.master("yarn")\
.config('spark.submit.deployMode','client')\
.config("spark.jars",pathElkJar) \
.enableHiveSupport() \
.getOrCreate()
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("es.read.field.include",includeFieldsString)
.option("es.query",q)
.option("es.nodes",elasticClusterIP)
.option("es.port",port)
.option("es.resource",indexNameTable)
.option("es.nodes.wan.only" , 'true')
.option("es.net.ssl", 'true')
.option("es.net.ssl.cert.allow.self.signed", "true")
.option("es.net.http.auth.user" ,elkUser )
.option("es.net.http.auth.pass" , elkPassword)
.option("es.read.metadata", "false")
.option("es.read.field.as.array.include","system_auth_hostname")
#.option("es.mapping.exclude", "index")
#.option("es.mapping.id", "_id")
#.option("es.read.metadata._id","_id")
#.option("delimiter", ",")
#.option("inferSchema","true")
#.option("first_row_is_header","true")
)
df = es_reader.load()
yarn在默认情况下正确地将2个执行器添加到我的应用程序中,因为我没有指定其他执行器。从elasticsearch加载数据时,df没有分区,因此我运行以下命令检查执行器行为:
df = df.repartition(2)
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
>> Number of partitions: 2
df.count()
我希望从spark ui看到两个执行者都在 count()
任务,但是我得到一个奇怪的行为,我有三个任务完成,没有重新分区它的两个任务的一个行动,其中第一个和更长的一个是由一个执行者运行,你可以看到在下面的图片:计数()-twoexecutor twopartitions。
如果我从一个已分为两个部分的配置单元表(os:linux/windows)中保存和加载数据,则一切正常:
df.write.mode('overwrite').format("parquet").partitionBy('OS').saveAsTable('test_executors')
df2 = spark.read.load("path")
df2.count()
在本例中,我得到以下结果:count()-twoexecutor-twopartitions\u loadfromhivetable
我得到了两个执行者想要的行为 count()
任务。
问题似乎出在 repartition(n)
我认为它正确地划分了df,我查过了 df.rdd.getNumPartitions()
但不会在执行者之间并行工作。
如有必要,我可以提供详细的任务中附加的图像。提前谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!