如何在每两行中加载csv两行

yrdbyhpb  于 2021-06-09  发布在  Cassandra
关注(0)|答案(1)|浏览(324)

我将spark连接到cassandra,我可以用传统的复制方法打印csv的行。但是,如果csv像大数据中通常发生的那样非常大,那么如何才能每两行加载两行csv文件,以避免冻结相关问题等?

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SparkCassandra {

  def main(args: Array[String]): Unit = {

      val conf = new SparkConf().setAppName("SparkCassandra").setMaster("local").set("spark.cassandra.connection.host", "localhost")
      val sc = new SparkContext(conf)
      val my_rdd = sc.cassandraTable("my_keyspace", "my_csv")
      my_rdd.take(20).foreach(println)
      sc.stop()
  }
}

我们应该使用时间变量或类似的东西吗?

eiee3dmh

eiee3dmh1#

如果您只想将数据加载到cassandra中,或者使用命令行从cassandra卸载数据,我建议您使用datastax bulk loader(dsbulk)——它对从cassandra/dse加载数据进行了大量优化。它同时适用于开源cassandra和dse。
在最简单的情况下,从表中加载和卸载将显示为(默认格式为csv):

dsbulk load -k keyspace -t table -url my_file.csv
dsbulk unload -k keyspace -t table -url my_file.csv

对于更复杂的情况,您可能需要提供更多选项。您可以在下面的一系列博客文章中找到更多信息。
如果您想用spark实现这一点,那么我建议使用dataframeapi而不是rdds。在这种情况下,您只需使用标准 read & write 功能。
要将数据从cassandra导出到csv:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl", "ks").load()
data.write.format("csv").save("my_file.csv")

或从csv读取并存储在cassandra中:

import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SaveMode
val data = spark.read.format("csv").save("my_file.csv")
data.cassandraFormat("tbl", "ks").mode(SaveMode.Append).save()

相关问题