我想直接在Cassandra DB上过滤数据,并使用Spark Java加载过滤后的数据。到目前为止,我只找到了用于选择行的.select()
和用于过滤数据的.filter()
,**在将其加载为数据集之后。
这是到目前为止的代码片段:
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.output.consistency.level", "ONE");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("org.apache.spark.sql.cassandra")
.options(ImmutableMap.of("table", "my_table", "keyspace", "my_keyspace"))
.select("col1", "col2")
.load();
dataset = dataset.filter("col1 > 9");
有人能在加载数据之前直接帮助应用过滤器吗?
1条答案
按热度按时间t9aqgxwy1#
Spark沿着Cassandra Connector会自动阅读与作业相关的数据,这是通过 predicate 下推和列修剪来完成的,您可以在物理计划中检查相同的内容(使用explain)。
这意味着Spark只会加载完成手头任务所需的数据(这是Lazy Evaluation的好处之一,因为Spark可以调优整体操作)
另请检查this answer以了解更多详细信息
参考:同品种器械下推和列修剪