我在将分区过滤器应用于spark(v2.0.2/2.1.1)Dataframe时遇到了一个问题,spark(v2.0.2/2.1.1)Dataframe从具有30000多个分区的hive(v2.1.0)表中读取数据。我想知道推荐的方法是什么,如果有的话,我做得不对,因为当前的行为是一个大的性能和可靠性问题的来源。
要启用修剪,我将使用以下spark/hive属性:
--conf spark.sql.hive.metastorePartitionPruning=true
在sparkshell中运行查询时,我可以看到通过调用 ThriftHiveMetastore.Iface.get_partitions
,但这在没有任何筛选的情况下意外发生:
val myTable = spark.table("db.table")
val myTableData = myTable
.filter("local_date = '2017-09-01' or local_date = '2017-09-02'")
.cache
// The HMS call invoked is:
// #get_partitions('db', 'table', -1)
如果我使用更简单的过滤器,分区会根据需要进行过滤:
val myTableData = myTable
.filter("local_date = '2017-09-01'")
.cache
// The HMS call invoked is:
// #get_partitions_by_filter(
// 'db', 'table',
// 'local_date = "2017-09-01"',
// -1
// )
如果我重写过滤器以使用范围运算符而不是简单地检查相等性,则过滤也可以正常工作:
val myTableData = myTable
.filter("local_date >= '2017-09-01' and local_date <= '2017-09-02'")
.cache
// The HMS call invoked is:
// #get_partitions_by_filter(
// 'db', 'table',
// 'local_date >= '2017-09-01' and local_date <= '2017-09-02'',
// -1
// )
在我们的例子中,从性能的Angular 来看,这种行为是有问题的;正确过滤后,通话时间为4分钟,而不是1秒。此外,常规装载大量 Partition
每次查询都将对象放在堆上,最终会导致metastore服务中的内存问题。
似乎在解析和解释某些类型的过滤器结构时有一个bug,但是我还没有在spark jira中找到相关的问题。是否有一个优先的方法或特定的Spark版本,过滤器适用于所有的过滤器变种?或者在构造过滤器时必须使用特定的形式(例如范围运算符)?如果是这样的话,这个限制是否在任何地方都有记录?
1条答案
按热度按时间1bqhqjot1#
我还没有找到一个首选的查询方式,除了重写过滤器在我的(操作)问题中所描述的。我确实发现spark改进了对此的支持,看起来spark2.3.0中已经解决了我的问题。这是解决我发现的问题的罚单:spark-20331