为什么cassandra在执行查询时使用“allow filtering”进行计数,而在我的代码中没有提到它?

bqf10yzr  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(391)

我在java8中使用spark-sql-2.4.1、spark-cassandra-connector\u2.11-2.4.1。
我做了如下简单的查询来获得c*表的行数。

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
long recCount = javaFunctions(sc).cassandraTable(keyspace, columnFamilyName).cassandraCount();

但它是超时与以下错误。

java.io.IOException: Exception during execution of SELECT count(*) FROM "radata"."model_vals" WHERE token("model_id", "type", "value", "code") > ? AND token("model_id", "type", "value", "code") <= ?   ALLOW FILTERING: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:350)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)

Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)

我使用的cassandra 6节点群集具有以下设置:

cassandra.output.consistency.level=ANY
cassandra.concurrent.writes=1500
cassandra.output.batch.size.bytes=2056
cassandra.output.batch.grouping.key=partition 
cassandra.output.batch.grouping.buffer.size=3000
cassandra.output.throughput_mb_per_sec=128
cassandra.connection.keep_alive_ms=30000
cassandra.read.timeout_ms=600000

1) 为什么它在实际执行之前附加“allow filtering”来解释呢?
2) 甚至认为我设置了“cassandra.output.consistency.level=any”为什么它要用“consistency local\u one”执行?
如何解决这些问题?

tmb3ates

tmb3ates1#

allow filtering被spark cassandra connector隐式地添加到生成的cql查询中。
一致性级别是为每个查询设置的,并在spark端设置。你可以用
spark.cassandra.input.consistency.level=任何
在spark配置上。
但是更改一致性级别对您没有帮助,因为只有一个cassandra节点没有响应。我建议您的表非常大,而且cassandra计算count所花费的时间比任何超时参数都多。可以在客户端的每个查询中设置此参数。在您的情况下,您可以查看spark cassandra连接器配置:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md
另一种方法是在spark端计算count,并将.count()而不是.cassandracount()。根据我的经验,我建议在生产上避免Cassandra方面的任何聚合。特别是,当您使用专为此类任务设计的spark-framework时。

相关问题