当我们使用spark从csv for db读取数据时,它会自动将数据分割成多个分区并发送给执行者
spark
.read
.option("delimiter", ",")
.option("header", "true")
.option("mergeSchema", "true")
.option("codec", properties.getProperty("sparkCodeC"))
.format(properties.getProperty("fileFormat"))
.load(inputFile)
目前,我有一个id列表:
[1,2,3,4,5,6,7,8,9,...1000]
我要做的是将这个列表分割成多个分区并发送给执行者,在每个执行者中,按如下方式运行sql
ids.foreach(id => {
select * from table where id = id
})
当我们从cassandra加载数据时,连接器将生成查询sql,如下所示:
select columns from table where Token(k) >= ? and Token(k) <= ?
这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个表,我只需要从id列表中k(分区键)所在的表中获取所有数据。
表架构为:
CREATE TABLE IF NOT EXISTS tab.events (
k int,
o text,
event text
PRIMARY KEY (k,o)
);
或者如何使用spark使用预定义的sql语句从cassandra加载数据而不扫描整个表?
1条答案
按热度按时间lrpiutwd1#
你只需要使用
joinWithCassandra
仅执行数据选择的功能是操作所必需的。但是请注意,此函数只能通过RDDAPI使用。像这样:
您需要确保Dataframe中的列名与cassandra中的分区键名匹配-有关更多信息,请参阅文档。
dataframe实现只在spark cassandra连接器的dse版本中可用,如以下博客文章所述。
2020年9月更新:spark cassandra connector 2.5.0中添加了对join with cassandra的支持