使用javaspark将数据集保存到cassandra

f0brbegy  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(576)

我正在尝试使用JavaSpark将数据集保存到CassandraDB。我能够使用下面的代码成功地将数据读入数据集

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

但当我尝试编写数据集时,得到了ioexception:无法加载或查找表,在keyspace中找到了类似的表

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

我在sparksession中设置主机和端口,我可以在覆盖和附加模式下写入,但不能创建表
我使用的版本如下:spark java 2.0 spark cassandra connector 2.3
尝试了不同的jar版本,但没有任何效果我也经历了不同的堆栈溢出和github链接
非常感谢您的帮助。

bqf10yzr

bqf10yzr1#

这个 write spark中的操作没有自动为您创建表的模式-这有多种原因。其中之一是需要为表定义主键,否则,如果设置了不正确的主键,则可能只会覆盖数据。因此,spark-cassandra连接器提供了一种基于Dataframe结构创建表的单独方法,但是您需要提供分区和集群键列的列表。在java中,它将如下所示(完整的代码在这里):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
          CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
          partitionSeqlist, clusteringSeqlist, connector);

然后你可以像往常一样写数据:

dataset.write()
   .format("org.apache.spark.sql.cassandra")
   .options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
   .save();

相关问题