rdd中的scala-spark-cassandra表滤波器

wrrgggsh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(631)

我必须在spark中过滤cassandra表,通过spark从表中获取数据后,在返回的rdd上应用filter函数,我们不想在cassandra api中使用where子句进行过滤,但是需要在filter列上定制sasi索引,这会由于cassandra中的多个ss表扫描而导致磁盘开销问题。例如:

val ct = sc.cassandraTable("keyspace1", "table1")
val fltr = ct.filter(x=x.contains "zz")

表1字段为:
脏的
文件名文本
事件int
事件时间戳bigint
文件ID int
文件类型int
基本上,我们需要根据文件名和任意字符串来过滤数据。因为返回的rdd是 com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD 过滤操作仅限于 CassandraRow 在此处输入图像描述

val ct = sc.cassandraTable("keyspace1", "table1")
    scala> ct
    res140: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[171] at RDD at CassandraRDD.scala:19

当我在下面的filter函数中点击“x.”后的tab时,它显示了cassandrarow类的以下方法`在这里输入代码

scala> ct.filter(x=>x.
columnValues   getBooleanOption   getDateTime         getFloatOption   getLongOption    getString             getUUIDOption     length
contains       getByte            getDateTimeOption   getInet          getMap           getStringOption       getVarInt         metaData
copy           getByteOption      getDecimal          getInetOption    getRaw           getTupleValue         getVarIntOption   nameOf
dataAsString   getBytes           getDecimalOption    getInt           getRawCql        getTupleValueOption   hashCode          size
equals         getBytesOption     getDouble           getIntOption     getSet           getUDTValue           indexOf           toMap
get            getDate            getDoubleOption     getList          getShort         getUDTValueOption     isNullAt          toString
getBoolean     getDateOption      getFloat            getLong          getShortOption   getUUID               iterator
jtoj6r0c

jtoj6r0c1#

您需要从 CassandraRow 对象,然后对其执行筛选。因此,此代码如下所示:

val fltr = ct.filter(x => x.getString("filename").contains("zz"))

相关问题