scala—如何将列表拆分为多个分区并发送给执行者

avkwfej4  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(744)

当我们使用spark从csv for db读取数据时,它会自动将数据分割成多个分区并发送给执行者

spark
  .read
  .option("delimiter", ",")
  .option("header", "true")
  .option("mergeSchema", "true")
  .option("codec", properties.getProperty("sparkCodeC"))
  .format(properties.getProperty("fileFormat"))
  .load(inputFile)

目前,我有一个id列表:

[1,2,3,4,5,6,7,8,9,...1000]

我要做的是将这个列表分割成多个分区并发送给执行者,在每个执行者中,按如下方式运行sql

ids.foreach(id => {    
select * from table where id = id
})

当我们从cassandra加载数据时,连接器将生成查询sql,如下所示:

select columns from table where Token(k) >= ? and Token(k) <= ?

这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个表,我只需要从id列表中k(分区键)所在的表中获取所有数据。
表架构为:

CREATE TABLE IF NOT EXISTS tab.events (
    k int,
    o text,
    event text
    PRIMARY KEY (k,o)
);

或者如何使用spark使用预定义的sql语句从cassandra加载数据而不扫描整个表?

lrpiutwd

lrpiutwd1#

你只需要使用 joinWithCassandra 仅执行数据选择的功能是操作所必需的。但是请注意,此函数只能通过RDDAPI使用。
像这样:

val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")

您需要确保Dataframe中的列名与cassandra中的分区键名匹配-有关更多信息,请参阅文档。
dataframe实现只在spark cassandra连接器的dse版本中可用,如以下博客文章所述。
2020年9月更新:spark cassandra connector 2.5.0中添加了对join with cassandra的支持

相关问题