我的代码的算法如下
第一步。获取一个hbase实体数据到hbaserdd
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
第二步。将hbaserdd转换为rowpairdd
// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
.mapToPair(***);
dataRDD.repartition(500);
dataRDD.cache();
第三步。将rowpairdd转换为schemardd
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema);
schemaRDD.registerTempTable("testentity");
sqlContext.sqlContext().cacheTable("testentity");
第四步。使用sparksql执行第一个简单的sql查询。
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
column3 = 'value1' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
第五步。使用sparksql执行第二个简单的sql查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
第六步。使用sparksql执行第三个简单的sql查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
试验结果如下:
测试用例1:
当我插入300000条记录时,hbase实体,然后运行代码。
第一次查询需要60407ms
第二个查询需要838毫秒
3td查询需要792ms
如果我使用hbase api进行类似的查询,只需要2000 ms。显然,最后2个spark sql查询比hbase api查询快得多。
我相信第一个sparksql查询会花费大量时间从hbase加载数据。
所以第一个查询比最后两个查询慢得多。我想结果是意料之中的
测试用例2:
当我插入40万条记录时。hbase实体,然后运行代码。
第一次查询需要87213毫秒
第二个查询需要83238ms
3td查询需要82092 ms
如果我使用hbase api进行类似的查询,只需要3500毫秒。显然,3个spark sql查询比hbase api查询慢得多。
最后两个sparksql查询也非常慢,性能与第一个查询相似,为什么?我如何调整性能?
2条答案
按热度按时间rwqw0loc1#
我怀疑您正在尝试缓存比已分配给spark示例的数据更多的数据。我将尝试分解在执行完全相同的查询时发生的事情。
首先,spark中的一切都是懒惰的。这意味着当你打电话的时候
rdd.cache()
,除非你用rdd做点什么,否则什么都不会发生。第一个查询
完全hbase扫描(慢)
增加分区数(导致无序,缓慢)
数据实际上被缓存到内存中,因为spark是懒惰的(有点慢)
应用where predicate (fast)
收集结果
第二/第三查询
全内存扫描(快速)
应用where predicate (fast)
收集结果
现在,spark将尝试缓存尽可能多的rdd。如果它不能缓存整个东西,你可能会遇到一些严重的减速。如果缓存之前的某个步骤导致了洗牌,则尤其如此。对于后续的每个查询,您可能会在第一个查询中重复步骤1-3。这不太理想。
要查看是否没有完全缓存rdd,请转到spark web ui(
http://localhost:4040
如果处于本地独立模式)并查找rdd存储/持久性信息。确保它是100%。编辑(根据评论):
我的hbase中有400000个数据大小,只有大约250mb。为什么我需要使用2g来修复这个问题(但是1g>>250mb)
我不能肯定你为什么会达到你的极限
spark.executor.memory=1G
,但我将添加一些有关缓存的更相关的信息。spark只将执行器堆内存的一部分分配给缓存。默认情况下,这是
spark.storage.memoryFraction=0.6
或60%。所以你真的只有1GB * 0.6
.hbase中使用的总空间可能与spark中缓存时占用的总堆空间不同。默认情况下,spark在内存中存储时不会序列化java对象。因此,在存储java时会有相当大的开销
Object
元数据。您可以更改默认的持久性级别。您知道如何缓存所有数据以避免第一次查询的性能不佳吗?
调用任何操作都会导致rdd被缓存。就这么做吧
现在它被缓存了。
fcg9iug32#
我希望您在一次运行中一个接一个地运行这些查询,如果是,为什么要为每个查询创建单独的sqlcontext?您还可以尝试重新分区rdd,这将增加并行性。如果可能,还可以缓存rdd。
希望以上步骤能提高性能。