spark任务只在一个执行器上运行

gjmwrych  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(440)

大家好首先我知道这个线程的存在,任务只在spark中的一个执行器上运行。
然而,这不是我的情况,因为我正在使用 repartition(n) 在我的数据框上。
基本上,我是通过spark从elasticsearch索引中获取数据来加载Dataframe的,如下所示:

spark = SparkSession.builder \
    .appName("elastic") \
    .master("yarn")\
    .config('spark.submit.deployMode','client')\
    .config("spark.jars",pathElkJar) \
    .enableHiveSupport() \
    .getOrCreate()

es_reader = (spark.read
        .format("org.elasticsearch.spark.sql")
        .option("es.read.field.include",includeFieldsString)
        .option("es.query",q)
        .option("es.nodes",elasticClusterIP)
        .option("es.port",port)
        .option("es.resource",indexNameTable)
        .option("es.nodes.wan.only" , 'true')
        .option("es.net.ssl", 'true')
        .option("es.net.ssl.cert.allow.self.signed", "true")
        .option("es.net.http.auth.user" ,elkUser )
        .option("es.net.http.auth.pass" , elkPassword)
        .option("es.read.metadata", "false")
        .option("es.read.field.as.array.include","system_auth_hostname")
        #.option("es.mapping.exclude", "index")
        #.option("es.mapping.id", "_id")
        #.option("es.read.metadata._id","_id")
        #.option("delimiter", ",")
        #.option("inferSchema","true")
        #.option("first_row_is_header","true")
        )

df = es_reader.load()

yarn在默认情况下正确地将2个执行器添加到我的应用程序中,因为我没有指定其他执行器。从elasticsearch加载数据时,df没有分区,因此我运行以下命令检查执行器行为:

df = df.repartition(2)
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
>> Number of partitions: 2
df.count()

我希望从spark ui看到两个执行者都在 count() 任务,但是我得到一个奇怪的行为,我有三个任务完成,没有重新分区它的两个任务的一个行动,其中第一个和更长的一个是由一个执行者运行,你可以看到在下面的图片:计数()-twoexecutor twopartitions。
如果我从一个已分为两个部分的配置单元表(os:linux/windows)中保存和加载数据,则一切正常:

df.write.mode('overwrite').format("parquet").partitionBy('OS').saveAsTable('test_executors')
df2 = spark.read.load("path")
df2.count()

在本例中,我得到以下结果:count()-twoexecutor-twopartitions\u loadfromhivetable
我得到了两个执行者想要的行为 count() 任务。
问题似乎出在 repartition(n) 我认为它正确地划分了df,我查过了 df.rdd.getNumPartitions() 但不会在执行者之间并行工作。
如有必要,我可以提供详细的任务中附加的图像。提前谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题