如何在Cassandra中使用Spark列出一个大的Cassandra表中的所有分区键?

kknvjkwl  于 2022-11-05  发布在  Cassandra
关注(0)|答案(2)|浏览(127)

我们有一个叫做cassandra-scan的程序,它使用spark-cassandra-connector在一个非常大的表中列出分区键的所有值。该表有大约1700万个Cassandra分区,每个分区平均有200行。存放该表的Cassandra集群在6个节点上运行DSE 5.1.8。包含该表的键空间的复制因子为3。
下面是密钥空间和表的简化定义。

CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;

CREATE TABLE myspace.largetable (
    id text,
    itemOrder text,
    ...
    PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)

cassandra-scan中用于列出分区键的所有值的语句如下所示:

val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)

我们使用Apache Spark 2.3.1和spark-cassandra-connector 2.3.2。用于启动cassandra-scan的命令如下所示。

/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &

cassandra-scan运行正常,大约需要19个小时。
我们最近建立了一个新的Cassandra集群,同样有6个节点(与第一个集群中使用的节点不同)。此集群运行DSE 6.8.16。第一个表中的所有数据都已添加到新集群中的表中。
我们将Apache Spark的版本更新为2.4.8,将spark-cassandra-connector更新为2.4.2。我们测试了Spark分区数在2000到200,000之间的程序。我们无法使cassandra-scan正确运行。我们看到以下形式的错误:

java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ?  PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)

某些cassandra-scan的运行导致某些Cassandra节点关闭,并在Cassandra日志中显示如下消息。
第一个
任何帮助让这个工作的人都非常感激。谢谢。

fnx2tebb

fnx2tebb1#

此错误指示群集中至少有一个节点无法为请求提供服务:

Not enough replicas available for query at consistency LOCAL_ONE \
      (1 required but only 0 alive)

您需要查看Cassandra日志以确定(1)哪个节点没有响应/不可用,以及(2)原因。干杯!

x7yiwoj4

x7yiwoj42#

我们使用DataStax Bulk Loader来解决这个问题。

dsbulk unload \
  --connector.csv.url <path>/<to>/<outputDir> \
  -h <host> \
  -query "select distinct id from myspace.largetable"

DSbulk花费大约3小时来获得1750万个值。

相关问题