我正在尝试将elasticsearch索引中的数据加载到spark中的Dataframe中。我的机器有12个cpu和1个内核。我正在jupyter笔记本上使用Pypark,其spark配置如下:
pathElkJar = currentUserFolder+"/elasticsearch-hadoop-"+connectorVersion+"/dist/elasticsearch- spark-20_2.11-"+connectorVersion+".jar"
spark = SparkSession.builder \
.appName("elastic") \
.config("spark.jars",pathElkJar) \
.enableHiveSupport() \
.getOrCreate()
现在不管我做什么:
df = es_reader.load()
或:
df = es_reader.load(numPartitions=12)
我从以下打印中得到相同的输出:
print('Master: {}'.format(spark.sparkContext.master))
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
print('Number of executors:{}'.format(spark.sparkContext._conf.get('spark.executor.instances')))
print('Partitioner: {}'.format(df.rdd.partitioner))
print('Partitions structure: {}'.format(df.rdd.glom().collect()))
Master: local[*]
Number of partitions: 1
Number of executors: None
Partitioner: None
我期待12个分区,我只能通过做一个 repartition()
在Dataframe上。此外,我认为默认情况下执行器的数量等于cpu的数量。但即使这样做:
spark.conf.set("spark.executor.instances", "12")
我无法手动设置执行者的数量。这是真的,我有一个核心的12个cpu的每一个,但我应该如何去呢?
我在创建spark会话后修改了配置文件(没有重新启动,这显然不会导致任何更改),通过如下指定executor的数量:
spark = SparkSession.builder \
.appName("elastic") \
.config("spark.jars",pathElkJar) \
.config("spark.executor.instances", "12") \
.enableHiveSupport() \
.getOrCreate()
我现在有12个遗嘱执行人。我仍然不明白为什么它不能自动执行,而且加载Dataframe时的分区数仍然是1。我希望遗嘱执行人的人数是12人,对吗?
1条答案
按热度按时间clj7thdc1#
关于执行器和分区的问题产生于这样一个事实:我在本地模式下使用spark,它允许最多一个执行器。使用yarn或mesos等其他资源管理器解决了这个问题