使用Spark Java直接在Cassandra上过滤

vmjh9lq9  于 2023-02-22  发布在  Cassandra
关注(0)|答案(1)|浏览(193)

我想直接在Cassandra DB上过滤数据,并使用Spark Java加载过滤后的数据。到目前为止,我只找到了用于选择行的.select()和用于过滤数据的.filter(),**在将其加载为数据集之后。
这是到目前为止的代码片段:

SparkConf sparkConf = new SparkConf()
                .setMaster("local")
                .setAppName("")
                .set("spark.cassandra.connection.host", "localhost")
                .set("spark.cassandra.auth.username", "cassandra")
                .set("spark.cassandra.auth.password", "cassandra")
                .set("spark.cassandra.output.consistency.level", "ONE");

SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();

Dataset<Row> dataset = spark
                .read()
                .format("org.apache.spark.sql.cassandra")
                .options(ImmutableMap.of("table", "my_table", "keyspace", "my_keyspace"))
                .select("col1", "col2")
                .load();

dataset = dataset.filter("col1 > 9");

有人能在加载数据之前直接帮助应用过滤器吗?

t9aqgxwy

t9aqgxwy1#

Spark沿着Cassandra Connector会自动阅读与作业相关的数据,这是通过 predicate 下推和列修剪来完成的,您可以在物理计划中检查相同的内容(使用explain)。
这意味着Spark只会加载完成手头任务所需的数据(这是Lazy Evaluation的好处之一,因为Spark可以调优整体操作)
另请检查this answer以了解更多详细信息
参考:同品种器械下推和列修剪

相关问题