我试图使用spark调用cassandra表的并行读取。但是我不能调用并行性,因为在给定的时间内只有一次读取。为了实现同样的目标,应该采取什么方法?
yeotifhr1#
我建议你使用下面的方法,来源于拉塞尔·斯皮策的博客使用部分扫描的联合来手动划分分区:将任务推送到最终用户也是一种可能性(以及当前的解决方法)。大多数最终用户已经理解了为什么他们有长分区,并且知道他们的列值所在的域。这使得他们可以手动划分一个请求,这样它就可以分割大的分区。例如,假设用户知道聚类列c的跨度从1到1000000。他们可以写这样的代码
val minRange = 0 val maxRange = 1000000 val numSplits = 10 val subSize = (maxRange - minRange) / numSplits sc.union( (minRange to maxRange by subSize) .map(start => sc.cassandraTable("ks", "tab") .where("c > $start and c < ${start + subSize}")) )
每个rdd将包含一组唯一的任务,这些任务只绘制完整分区的一部分。union操作将所有这些不同的任务连接到一个rdd中。任何单个spark分区从单个cassandra分区提取的最大行数将限制为maxrange/numsplits。这种方法虽然需要用户干预,但可以保持局部性,并且仍然可以最小化磁盘扇区之间的跳转。同时读取调谐参数
1条答案
按热度按时间yeotifhr1#
我建议你使用下面的方法,来源于拉塞尔·斯皮策的博客
使用部分扫描的联合来手动划分分区:将任务推送到最终用户也是一种可能性(以及当前的解决方法)。大多数最终用户已经理解了为什么他们有长分区,并且知道他们的列值所在的域。这使得他们可以手动划分一个请求,这样它就可以分割大的分区。
例如,假设用户知道聚类列c的跨度从1到1000000。他们可以写这样的代码
每个rdd将包含一组唯一的任务,这些任务只绘制完整分区的一部分。union操作将所有这些不同的任务连接到一个rdd中。任何单个spark分区从单个cassandra分区提取的最大行数将限制为maxrange/numsplits。这种方法虽然需要用户干预,但可以保持局部性,并且仍然可以最小化磁盘扇区之间的跳转。
同时读取调谐参数