来自elasticsearch的Spark负载:执行器和分区的数量

rks48beu  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(450)

我正在尝试将elasticsearch索引中的数据加载到spark中的Dataframe中。我的机器有12个cpu和1个内核。我正在jupyter笔记本上使用Pypark,其spark配置如下:

pathElkJar = currentUserFolder+"/elasticsearch-hadoop-"+connectorVersion+"/dist/elasticsearch- spark-20_2.11-"+connectorVersion+".jar"

spark = SparkSession.builder \
    .appName("elastic") \
    .config("spark.jars",pathElkJar) \
    .enableHiveSupport() \
    .getOrCreate()

现在不管我做什么:

df = es_reader.load()

或:

df = es_reader.load(numPartitions=12)

我从以下打印中得到相同的输出:

print('Master: {}'.format(spark.sparkContext.master))
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Number of executors:{}'.format(spark.sparkContext._conf.get('spark.executor.instances')))
print('Partitioner: {}'.format(df.rdd.partitioner))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))

Master: local[*]
Number of partitions: 1
Number of executors: None
Partitioner: None

我期待12个分区,我只能通过做一个 repartition() 在Dataframe上。此外,我认为默认情况下执行器的数量等于cpu的数量。但即使这样做:

spark.conf.set("spark.executor.instances", "12")

我无法手动设置执行者的数量。这是真的,我有一个核心的12个cpu的每一个,但我应该如何去呢?
我在创建spark会话后修改了配置文件(没有重新启动,这显然不会导致任何更改),通过如下指定executor的数量:

spark = SparkSession.builder \
    .appName("elastic") \
    .config("spark.jars",pathElkJar) \
    .config("spark.executor.instances", "12") \
    .enableHiveSupport() \
    .getOrCreate()

我现在有12个遗嘱执行人。我仍然不明白为什么它不能自动执行,而且加载Dataframe时的分区数仍然是1。我希望遗嘱执行人的人数是12人,对吗?

clj7thdc

clj7thdc1#

关于执行器和分区的问题产生于这样一个事实:我在本地模式下使用spark,它允许最多一个执行器。使用yarn或mesos等其他资源管理器解决了这个问题

相关问题