我有以下代码来使用spark将json存储到cassandra
ss.read().json("test_data.json").write()
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.option("table", table)
.option("keyspace", KEY_SPACE)
.option("confirm.truncate", true)
.save();
表有一个主键,当记录的主键值为空时, save()
引发异常 TypeConversionException Cannot convert object [null,null,null,null,null,n..., "test text test text" type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to List[AnyRef]
对我来说,很明显,这个记录应该被过滤掉,或者在异常发生时记录下来。对我来说,问题是我找不到捕获此异常的方法,然后我就可以记录脏记录了。 sc.read().json("test_data.json").na().drop()
没有帮助,因为记录中有一些数据。
我发现有一个 saveToCassandra()
方法,该方法可能有实现异常处理程序的方法,但在我的sparksession中找不到它。
SparkSession ss = SparkSession
.builder()
.config("spark.cassandra.connection.host", cassandraHost)
.config("spark.master", "local")
.getOrCreate();
我使用最新的spark版本2.3.2。
暂无答案!
目前还没有任何答案,快来回答吧!