给定架构:
CREATE TABLE keyspace.table (
key text,
ckey text,
value text
PRIMARY KEY (key, ckey)
)
…和spark伪代码:
val sc: SparkContext = ...
val connector: CassandraConnector = ...
sc.cassandraTable("keyspace", "table")
.mapPartitions { partition =>
connector.withSessionDo { session =>
partition.foreach { row =>
val key = row.getString("key")
val ckey = Random.nextString(42)
val value = row.getString("value")
session.execute(s"INSERT INTO keyspace.table (key, ckey, value)" +
" VALUES ($key, $ckey, $value)")
}
}
}
这样的代码是否可以在单个应用程序(spark作业)运行时读取插入的值?我的问题的更一般化的版本是,令牌范围扫描cql查询是否可以在遍历行时读取新插入的值。
1条答案
按热度按时间t40tm48m1#
拉多是的,这是可能的,正如亚历克斯写的,但我不认为这与上述代码是可能的
因此,对于每个数据模型,表由ckey按升序排序
然而,有趣的是页面大小和预取的页面数量,因为默认情况下这是1000(spark.cassandra.input.fetch.sizeinrows),那么如果您不使用42,但使用更大的值和/或执行器还没有分页,那么唯一的问题可能会出现
另外,我认为您使用了不必要的嵌套,因此实现所需的代码可能会简化(毕竟cassandratable将为您提供一个Dataframe)。
(我希望我理解您希望读取每个分区(请注意,在您的示例中,分区是一个主键(“key”)下的所有行,并且对于该分区中的每一行(由ckey区分)生成一个新的(用new ckey只会用new ckey复制值)-这种代码的用例对我来说是个谜,但我希望它有一些意义:-)
我的手指