我有两个大的配置单元表,我想用spark.sql连接它们。假设我们有表1和表2,表1有500万行,表2有7000万行。表是snappy格式的,并作为Parquet文件存储在配置单元中。
我想加入它们,并对某些列进行一些聚合,比如说计数所有行和列的平均值(例如doublecolumn),同时使用两个条件(比如col1、col2)进行过滤。
注意:我在一台机器上进行测试安装(虽然功能非常强大)。我预计集群中的性能可能会有所不同。
我的第一次尝试是使用spark sql,例如:
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
不幸的是,即使我给每个执行器和驱动程序至少8gb内存,这个5分钟的运行也非常糟糕。我还尝试使用dataframe语法,并尝试先过滤行,然后只选择特定的列以获得更好的选择性,如:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
但这并没有显著的性能提升。如何提高join的性能?
执行spark.sql或dataframe语法的最佳方法是什么?
给更多的执行者或记忆会有帮助吗?
我应该使用缓存吗?
我缓存了dataframes tab1、tab2和join aggregation,它们都有显著的收益,但我认为缓存我的dataframes并不实用,因为我们对并发感兴趣,许多用户同时询问一些分析性查询。
是不是因为我在单节点上工作,而当我在集群上的生产环境中工作时,我的问题就会消失?
额外的问题:我用impala尝试了这个查询,它做了大约40秒,但是它比spark.sql好得多。 Impala 怎么能比星火更好?!
2条答案
按热度按时间shstlldc1#
执行spark.sql或dataframe语法的最佳方法是什么?
没有任何区别。
给更多的执行者或记忆会有帮助吗?
只有当问题不是由数据倾斜引起并且您正确地调整了配置时。
我应该使用缓存吗?
如果输入数据被多次重用,那么最好(正如您已经确定的那样)考虑性能。
是不是因为我在单节点上工作,而当我在集群上的生产环境中工作时,我的问题就会消失?
一般来说,在单个节点上进行性能测试是完全无用的。它忽略了瓶颈(网络io/通信)和优势(分摊磁盘i/o和资源使用)。
但是,您可以显著减少并行性(
spark.sql.shuffle.partitions
,sql.default.parallelism
增加了输入分割大小)。反直觉Spark式并行,是为分配负载而设计的,在单机上是一种负担,而不是一种资产。这取决于洗牌(磁盘写入!)与共享内存相比,通信速度非常慢,调度开销非常大。Impala 怎么能比星火更好?!
因为它是专门为低延迟并发查询设计的。它从来不是spark(数据库vs.etl框架)的目标。
像你一样
由于我们对并发感兴趣,许多用户同时询问一些分析性查询。
spark听起来不是个正确的选择。
bgibtngc2#
您可以更改配置,而且您必须在大型集群上更改它们。我能马上想到两件事。套
spark.executor.cores
也取决于内存,给更多的执行者和内存spark.executor.instances
以及spark.executor.memory
. 还可以按列对配置单元表进行存储和排序吗?如果您将表存储在bucket中,那么它将消除在加入表之前对表进行排序的需要。如果在连接之后缓存Dataframe,速度可能会更快,这取决于catalyst处理聚合查询的方式。你可以
unpersist()
同样在查询结束后,但我同意gc可能不值得这样做。使用sql或scala dsl不会有任何好处。两者都使用全阶段代码生成,因此本质上是相同的。
impala总是更快的一个原因是它从不担心复制问题,尽管有一个节点不应该那么麻烦,但是spark在预共享复制数据和不需要复制之间可能没有一个优雅的分离。