读取带有空值udt的cassandra表,并Map到spark中的scala case类

9o685dep  于 2021-06-13  发布在  Cassandra
关注(0)|答案(1)|浏览(251)

错误显示:
原因:java.lang.nullpointerexception:请求了GetTableToAppedTypeConverter的typetag,由于scala 2.10 typetag限制,该typetag无法反序列化typetag。它们返回为空,因此你看到这个npe。
gradle.build公司

dependencies {
    implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.11'
    implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'
    implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'
    implementation group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.12', version: '2.5.0'
    implementation group: 'org.apache.spark', name: 'spark-mllib_2.12', version: '2.4.5'
    implementation group: 'log4j', name: 'log4j', version: '1.2.17'
    implementation group: 'org.scalaj', name: 'scalaj-http_2.12', version: '2.4.2'
}

scala对象

object SparkModule {
    case class UDTCaseClass(a: Int = 0, b: Float = 0f, c: Int = 0, d: Int = 0)
    case class TableCaseClass(id: UUID, col1: Boolean, list: List[UDTCaseClass])

    val spark = SparkSession.builder
        .master("local[2]")
        .appName("App")
        .config("spark.cassandra.connection.host", "127.0.0.1")
        .config("spark.cassandra.connection.port", "9042")
        .config("spark.executor.cores", "1")
        .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    val cassandraRDD = sc.cassandraTable[TableCaseClass](
        "keyspace", "table"
    ).limit(20)

    println(cassandraRDD.count())
}

起初,有时会显示错误,有时不会,直到我缩小了范围,意识到当udt的任何字段 null ,否则效果很好。例如,如果表包含以下任何一行,则会引发错误:
f39b5201-1e96-44a8-946c-d959c217f174 |假|[{a:123,b:2.3,c:33,d:null}]
f39b5201-1e96-44a8-946c-d959c217f174 |假|[{a:123,b:2.3,c:null,d:34}]
f39b5201-1e96-44a8-946c-d959c217f174 |假|[{a:123,b:null,c:33,d:12}]
f39b5201-1e96-44a8-946c-d959c217f174 |假|[{a:空,b:2.3,c:33,d:22}]
例如,这个:
f39b5201-1e96-44a8-946c-d959c217f174 |假|空
读起来很好 cassandraTable .
我试过用 Option 这样地: case class UDTCaseClass(a: Option[Int] = None, b: Option[Float] = None, c: Option[Int] = None, d: Option[Int] = None) ,但出现相同的错误。
我总是可以插入0而不是 null 但是,这能避免吗?
谢谢

6uxekuva

6uxekuva1#

与spark 2.4.2/scala 2.12和scc 2.5.0配合使用效果良好。
对于以下udt/表格和数据:

CREATE TYPE test.udt (
  id int,
  t1 int,
  t2 int,
  a2 int
);

CREATE TABLE test.u3 (
    id int PRIMARY KEY,
    u list<frozen<udt>>
);
insert into test.u3(id, u) values (5, [{id: 1, t1: 3}]);

下面的scala代码可以正常工作:

case class UDT(id: Int, t1: Int, t2: Option[Int], a2: Option[Int])
case class U3(id: Int, u: List[UDT])

import com.datastax.spark.connector._
val d = sc.cassandraTable[U3]("test", "u3")
d.collect

它返回: Array(U3(5,List(UDT(1,3,None,None)))) 一如预期。
您的错误可能是由于您可能没有重新编译代码,或者它以某种方式被缓存。。。
p、 正如我在评论中指出的,如果您刚刚开始,您更喜欢使用dataframeapi,因为它完全受scc支持。

相关问题