我使用的是sparksql2.4.1、spark-cassandra-connector_2.11-2.4.1.jar和java8。我有一个情况,为了审计的目的,我需要计算ctable的表行数。我的c表中有大约20亿条记录。
为了计算行数,我尝试了两种方法,如下所示。
public static Long getColumnFamilyCountJavaApi(SparkSession spark,String keyspace, String columnFamilyName) throws IOException{
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
return javaFunctions(sc).cassandraTable(keyspace, columnFamilyName).cassandraCount();
}
public static Long getColumnFamilyCount(SparkSession spark,String keyspace, String columnFamilyName) throws IOException{
return spark
.read()
.format("org.apache.spark.sql.cassandra")
.option("table", columnFamilyName)
.option("keyspace",keyspace )
.load().count();
}
但两种方法都会导致相同的错误。
Caused by: com.datastax.driver.core.exceptions.ReadFailureException: Cassandra failure during read query at consistency LOCAL_QUORUM (2 responses were required but only 0 replica responded, 2 failed)
at com.datastax.driver.core.exceptions.ReadFailureException.copy(ReadFailureException.java:85)
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
at com.datastax.spark.connector.cql.DefaultScanner.scan(Scanner.scala:34)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:342)
如何处理这种情况?
1条答案
按热度按时间xvw2m8pv1#
该错误堆栈是节点的读取超时。这实际上可能是由于许多原因。我不回答这个错误,而是要回答你的最终目标是什么。
您正在尝试在cassandra中计算表中的行数。
虽然这不是一个不合理的要求,但对Cassandra来说,这是一个有点棘手的主题。这是因为计数是群集范围的。请看这篇相当不错的博客文章,解释为什么会这样。
我可以看到你在这里使用Spark,所以你可能已经意识到,在cqlsh计数可能是昂贵的。你可能想看一看学院的录像
cassandraCount
另请参阅spark connector文档您可能还对dsbulk工具感兴趣。我已经成功地将这个工具用于许多事情,从大型数据迁移到小型工作(如计数等),请参阅这里的dsbulk文档
希望这对你有所帮助!