我将spark连接到cassandra,我可以用传统的复制方法打印csv的行。但是,如果csv像大数据中通常发生的那样非常大,那么如何才能每两行加载两行csv文件,以避免冻结相关问题等?
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
object SparkCassandra {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkCassandra").setMaster("local").set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val my_rdd = sc.cassandraTable("my_keyspace", "my_csv")
my_rdd.take(20).foreach(println)
sc.stop()
}
}
我们应该使用时间变量或类似的东西吗?
1条答案
按热度按时间eiee3dmh1#
如果您只想将数据加载到cassandra中,或者使用命令行从cassandra卸载数据,我建议您使用datastax bulk loader(dsbulk)——它对从cassandra/dse加载数据进行了大量优化。它同时适用于开源cassandra和dse。
在最简单的情况下,从表中加载和卸载将显示为(默认格式为csv):
对于更复杂的情况,您可能需要提供更多选项。您可以在下面的一系列博客文章中找到更多信息。
如果您想用spark实现这一点,那么我建议使用dataframeapi而不是rdds。在这种情况下,您只需使用标准
read
&write
功能。要将数据从cassandra导出到csv:
或从csv读取并存储在cassandra中: