使用Cassandra的ScalaSpark过滤器RDD

qv7cva1a  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(129)

我是Spark-Cassandra和Scala新手。我有一个现有的RDD。假设:
((url_hash、URL、创建的_时间戳))。
我想基于url_hash过滤这个RDD。如果url_hash存在于Cassandra表中,那么我想把它从RDD中过滤出来,这样我就可以只对新的url进行处理。
Cassandra表如下所示:

url_hash| url | created_timestamp | updated_timestamp

任何指针都会很棒。
我试过这样的方法:

case class UrlInfoT(url_sha256: String, full_url: String, created_ts: Date)
   def timestamp = new java.utils.Date()
   val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp)))
   val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts")
   val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts)))
   newUrlsRDD = rdd1.subtractByKey(rdd3)

我收到 cassandra 错误

java.lang.NullPointerException: Unexpected null value of column full_url in      keyspace.url_info.If you want to receive null values from Cassandra, please wrap the column type into Option or use JavaBeanColumnMapper

cassandra表中没有空值

332nm8kg

332nm8kg1#

谢谢原型保罗!
我希望有人觉得这个有用。不得不向case类添加Option。
期待更好的解决方案

case class UrlInfoT(url_sha256: String, full_url: Option[String], created_ts: Option[Date])

def timestamp = new java.utils.Date()
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp)))
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace",   "url_info").select("url_sha256", "full_url", "created_ts")
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts)))
newUrlsRDD = rdd1.subtractByKey(rdd3)

相关问题