我有2个cassandra示例(本地独立),我已经创建了3个节点的集群表
CREATE TABLE dev.test (
a text,
b int,
c text,
PRIMARY KEY ((a, b), c)) WITH CLUSTERING ORDER BY (c ASC)
我已输入数据
a | b | c
---+---+---
A | 1 | C
B | 2 | D
我连接到Spark壳使用下面的命令。对于集群,它是集群中某台机器的i/p。下面是单独的命令
./spark-shell --conf spark.cassandra.connection.host=127.0.0.1 --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> case class PrimaryKey(a: String,b: Int)
defined class PrimaryKey
scala> val idsOfInterest = sc.sparkContext.parallelize(Seq(PrimaryKey("B",2)))
idsOfInterest: org.apache.spark.rdd.RDD[PrimaryKey] = ParallelCollectionRDD[0] at parallelize at <console>:29
scala> val repartitioned = idsOfInterest.repartitionByCassandraReplica("dev", "test" )
repartitioned: com.datastax.spark.connector.rdd.partitioner.CassandraPartitionedRDD[PrimaryKey] = CassandraPartitionedRDD[6] at RDD at CassandraPartitionedRDD.scala:18
现在我在我的本地电脑上看到了这一步的不同**
scala> repartitioned.partitions
res1: Array[org.apache.spark.Partition] = Array(ReplicaPartition(0,[Ljava.lang.String;@6d4693ee), ReplicaPartition(1,[Ljava.lang.String;@75038f62), ReplicaPartition(2,[Ljava.lang.String;@31bb5652), ReplicaPartition(3,[Ljava.lang.String;@77b46743), ReplicaPartition(4,[Ljava.lang.String;@2eb453b8), ReplicaPartition(5,[Ljava.lang.String;@4725a193), ReplicaPartition(6,[Ljava.lang.String;@2c4766e3), ReplicaPartition(7,[Ljava.lang.String;@781d9257), ReplicaPartition(8,[Ljava.lang.String;@7438377f), ReplicaPartition(9,[Ljava.lang.String;@1e7b3952))
连接到群集时执行的步骤相同(接触点之一)
scala> repartitioned.partitions
res0: Array[org.apache.spark.Partition] = Array()
因此,当我加入集群上的cassandratable时,它不会返回任何记录
我需要在我的集群上进行其他配置吗?
如果我登录到任何一个cassandra机器并执行这些步骤,更新同样的工作。然而,当我从我的Spark簇大师那里跑出来的时候,它就不起作用了
暂无答案!
目前还没有任何答案,快来回答吧!