我正在使用emr4.3上的spark1.6来查询属于hive元存储(由s3中的gzipParquet文件支持)中的表的约15tb的数据。对于我的集群,我有一个r3.8xlarge主节点和15个r3.8xlarge核心节点(3.6tbram,9.6tbssd)。
大约15 TB的数据包含在大约90亿行中。每行约有15列存储长度为5-50的字符串,一列包含约30个字符串(每个字符串10-20个字符)的数组。数组中仅存储约100万个唯一字符串。我要做的只是计算数组列中的唯一字符串,但似乎内存不足,因为我不断得到:outofmemoryerror:无法在执行器上创建新的本机线程。由于内存不足错误,任务失败,执行器被禁用,然后作业失败。
当我查询5-10tb的数据时,它可以工作。我一定不能正确理解存储在内存中的内容(这正是我想弄清楚的)。顺便说一句,在上面的集群中,我设置:
spark.executor.memory 30g
spark.executor.cores 5
spark.executor.instances 90 // 6 instances per r3.8xlarge host
我不认为sparksql将中间表存储在内存中。由于没有超过1m个唯一的字符串,我认为带计数的字符串应该很容易放入内存中。问题是:
val initial_df = sqlContext.sql("select unique_strings_col from Table where timestamp_partition between '2016-09-20T07:00:00Z' and '2016-09-23T07:00:00Z'")
initial_df.registerTempTable("initial_table") // ~15TB compressed data to read in from S3
val unique_strings_df = sqlContext.sql("select posexplode(unique_strings_col) as (string_pos, string) from initial_table").select($"string_pos", $"string")
unique_strings_df.registerTempTable("unique_strings_table") // ~70% initial data remaining at this point
val strings_count_df = sqlContext.sql("select string, count(*) as unique_string_count from unique_strings_table where string_pos < 21 group by string order by unique_string_count desc") // ~50% initial data remaining at this point
strings_count_df.write.parquet("s3://mybucket/counts/2016-09-20-2016-09-23")
压缩的Parquet文件很小(比如每个5mb)。似乎可以一次读取一个,过滤,并存储它们的计数。我错过了什么?
1条答案
按热度按时间cnwbcb6i1#
因此,我需要有足够的磁盘+内存空间来存储初始rdd。如果我在创建temp表之前在初始rdd中进行更多的预先过滤,我就能够成功地运行查询。耶!