我们有一个叫做cassandra-scan的程序,它使用spark-cassandra-connector在一个非常大的表中列出分区键的所有值。该表有大约1700万个Cassandra分区,每个分区平均有200行。存放该表的Cassandra集群在6个节点上运行DSE 5.1.8。包含该表的键空间的复制因子为3。
下面是密钥空间和表的简化定义。
CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
CREATE TABLE myspace.largetable (
id text,
itemOrder text,
...
PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)
cassandra-scan中用于列出分区键的所有值的语句如下所示:
val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)
我们使用Apache Spark 2.3.1和spark-cassandra-connector 2.3.2。用于启动cassandra-scan的命令如下所示。
/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &
cassandra-scan运行正常,大约需要19个小时。
我们最近建立了一个新的Cassandra集群,同样有6个节点(与第一个集群中使用的节点不同)。此集群运行DSE 6.8.16。第一个表中的所有数据都已添加到新集群中的表中。
我们将Apache Spark的版本更新为2.4.8,将spark-cassandra-connector更新为2.4.2。我们测试了Spark分区数在2000到200,000之间的程序。我们无法使cassandra-scan正确运行。我们看到以下形式的错误:
java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ? PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
某些cassandra-scan的运行导致某些Cassandra节点关闭,并在Cassandra日志中显示如下消息。
第一个
任何帮助让这个工作的人都非常感激。谢谢。
2条答案
按热度按时间fnx2tebb1#
此错误指示群集中至少有一个节点无法为请求提供服务:
您需要查看Cassandra日志以确定(1)哪个节点没有响应/不可用,以及(2)原因。干杯!
x7yiwoj42#
我们使用DataStax Bulk Loader来解决这个问题。
DSbulk花费大约3小时来获得1750万个值。