cassandra 为什么Apache Spark在客户端上执行过滤器

xjreopfe  于 2022-11-05  发布在  Cassandra
关注(0)|答案(3)|浏览(151)

作为Apache Spark的新手,在Spark上获取Cassandra数据时遇到了一些问题。

List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
                    cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
                    where("Id=? and date IN ?","Open",dates);

此查询没有在cassandra服务器上过滤数据。当此java语句执行时,它占用了内存,最后引发了spark java.lang.OutOfMemoryError异常。查询应该在cassandra服务器上过滤数据,而不是如https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md中所述的客户端。
当我在cassandra cqlsh上执行带有过滤器的查询时,它的性能很好,但是在没有过滤器(where子句)的情况下执行查询时,会出现预期的超时。因此,很明显spark没有在客户端应用过滤器。

SparkConf conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[8]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15")

为什么要在客户端应用筛选器,以及如何改进以在服务器端应用筛选器。
我们如何在windows平台上配置cassandra集群之上的spark集群?

hts6caw3

hts6caw31#

没有使用Cassandra与Spark,从阅读您提供的部分(谢谢),我看到:
注意:虽然ALLOW FILTERING子句已隐含新增至产生的CQL查询,但Cassandra引擎目前并不允许所有述词。这个限制将在未来的Cassandra版本中解决。目前,ALLOW FILTERING适用于由次要索引或丛集数据栏建立索引的数据栏。
我非常肯定(但还没有测试)不支持“IN” predicate :请访问https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80
因此,您可以尝试将where子句限制为Id(假设有一个二级索引),并对日期范围使用spark过滤。

rbl8hiat

rbl8hiat2#

我建议将表作为DataFrame而不是RDD读入,这些在Spark 1.3或更高版本中可用,然后您可以将CQL查询指定为如下字符串:

CassandraSQLContext sqlContext = new CassandraSQLContext(sc);

String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')";
DataFrame resultsFrame = sqlContext.sql(query);

System.out.println(resultsFrame.count());

所以试试看,看看它是否对你更有效。
一旦你在DataFrame中有了数据,你就可以在它上面运行Spark SQL操作。如果你想把数据放在RDD中,你可以把DataFrame转换成RDD。

u4vypkhs

u4vypkhs3#

在SparkConfing中设置spark.cassandra.input.split.size_in_mb解决了这个问题。

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

Spark-cassnadra-connector读取了错误的spark.cassandra.input.split.size_in_mb值,因此在SparkConf中覆盖此值即可完成工作。现在IN子句也可以正常工作。

相关问题