通过java+spark+sparksession在cassandra表中插入/更新行的最佳方法是什么

ha5z0ras  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(479)

这是通过java+spark+sparksession从cassandra表中获取数据的方法:

SparkSession spark = SparkSession
          .builder()
          .appName("JavaDemoDataSet")
          .config("spark.sql.warehouse.dir", "/file:C:/temp")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .master("local[2]")
          .getOrCreate();

 Dataset<Row> dataset = spark.read()
        .format("org.apache.spark.sql.cassandra")
        .options(new HashMap<String, String>() {
            {
                put("keyspace", "chat");
                put("table", "dictionary");
            }
        })
        .load()
        .filter("value_id BETWEEN 1 AND 5");

但是当我研究如何在这个表中添加或修改行(至少1行)时,我找不到最好的方法。例如,我正在用gui开发一个简单的应用程序,我需要向“dictionary”表添加一个新值。因此,在这种情况下,从我的Angular 来看,我不需要数据集来完成这项工作。
当我研究如何通过sparksession添加一行时,我找不到java+spark+sparksession的例子来说明如何做到这一点。我当然可以用cql语句via语句来实现这一点,但是哪种方法最好更新或添加1或2行呢?尤其是当我用sparksession来读的时候。
如果可能的话,我会非常感谢你的例子(甚至是超链接,我做了很多研究,但可能我错过了一些重要的东西),因为我对所有这些都很陌生。
谢谢您!

imzjd6km

imzjd6km1#

下面是使用java+sparksession+cassandraconnector保存和读取的示例。

public class SparkCassandraDatasetApplication {
public static void main(String[] args) {
     SparkSession spark = SparkSession
      .builder()
      .appName("SparkCassandraDatasetApplication")
      .config("spark.sql.warehouse.dir", "/file:C:/temp")
      .config("spark.cassandra.connection.host", "127.0.0.1")
      .config("spark.cassandra.connection.port", "9042")
      .master("local")
      .getOrCreate();

    //Data
    MyData data = new MyData();
    data.setId("111");
    data.setUsername("userOne");
    List<MyData> users = Arrays.asList(data);
    Dataset<MyData> datasetWrite = spark.createDataset(users, Encoders.bean(MyData.class));

    //Save data to Cassandra
    datasetWrite.write().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {
        {
            put("keyspace", "mykeyspace");
            put("table", "mytable");
        }
    }).mode(SaveMode.Append).save();

    //Read data back
    Dataset<Row> datasetRead = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                }
            }).load();

    datasetRead.show();
    spark.stop();
   }
}
li9yvcax

li9yvcax2#

我强烈建议不要使用spark进行单行更新。内置的连接器方法是面向大量数据的,对于单行更改可能效率很低。。您最好直接使用驱动程序或使用cassandraconnector接口。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-手动发送到cassandra

相关问题