spark cassandra连接器:机具scd类型1

8mmmxcuj  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(334)

我是cassandra的新手,我想在cassandra db中实现scd type-1。
此scd type1作业将从spark执行。
数据将存储为时间序列分区数据。即:年/月/日
示例:我有过去300天的记录,我的新记录可能既有新记录,也有更新的记录。我想比较过去100天的更新记录,如果记录是新的,那么它应该执行插入操作或更新。
我没有得到任何执行此操作的线索,因此没有共享任何cql:(
示例表结构为:

CREATE TABLE crossfit_gyms_by_city_New (  
 country_code text,  
 state_province text,  
 city text,  
 gym_name text,  
 PRIMARY KEY ((country_code, state_province), gym_name)  
) WITH CLUSTERING ORDER BY (gym_name ASC );

我的示例spark代码:

object SparkUpdateCassandra {
  System.setProperty("hadoop.home.dir", "C:\\hadoop\\")

  def main(args: Array[String]): Unit = {
    val spark = org.apache.spark.sql.SparkSession
      .builder()
      .master("local[*]")
      .config("spark.cassandra.connection.host", "localhost")
      .appName("Spark Cassandra Connector Example")
      .getOrCreate()

    import spark.implicits._

    //Read Cassandra data using DataFrame
    val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
    FirstDF.show(10)
    FirstDF.write
          .format("org.apache.spark.sql.cassandra")
          .mode("append")
          .option("confirm.truncate", "true")
          .option("spark.cassandra.connection.host", "localhost")
          .option("spark.cassandra.connection.port", "9042")
          .option("keyspace", "emc_test")
          .option("table", "crossfit_gyms_by_city_new")
          .save()
    val loaddf1 = spark.read
      .format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
      .load()
    loaddf1.show(10)

//    spark.implicits.wait(5000)

    val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
    SecondDF.show(10)

    SecondDF.write
      .format("org.apache.spark.sql.cassandra")
      .mode("append")
      .option("confirm.truncate", "true")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .option("keyspace", "emc_test")
      .option("table", "crossfit_gyms_by_city_new")
      .save()

    val loaddf2 = spark.read
      .format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
      .load()
    loaddf2.show(10)

  }
}

注意:我使用scala作为spark框架。

g52tjvyc

g52tjvyc1#

在cassandra中,一切都是向上插入的-如果行不存在,它将被插入,如果它存在,那么它将被更新,所以您只需要将数据导入rdd或dataframe,并使用spark cassandra connector的相应功能:
用于rdd api的savetocassandra:

rdd.saveToCassandra("keyspace", "table")

或者只是 write indataframe api:

df.write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_name", "keyspace" -> "keyspace_name"))
  .mode(SaveMode.Append)
  .save()
vohkndzv

vohkndzv2#

为了实现这一点,有一些事实可以帮助您浏览将遇到的代码示例
在前面的spark 1代码中,我们将使用
1 sparkcontext见文档
2要连接到cassandra,请使用用sparkcontext构造的cassandrasqlcontext
对于spark 2来说,这种情况已经发生了很大的变化
创建spark会话和cassandraconnector[1]
然后,您可以使用[1]中所示的会话运行本机sql
一旦设置并运行了这个命令,您就可以为scd类型1操作执行适当的sql,可以找到相关sql的好例子。

相关问题