spark:每日从cassandra阅读并保存到parquets,如何只阅读新行?

5vf7fwbs  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(421)

我正在尝试用spark构建一个etl过程。我的目标是读取Cassandra表和保存到Parquet文件。
到目前为止,我所做的是使用一个cassandra连接器(在pyspark中)从cassandra读取整个表:

df = app.sqlSparkContext.read.format("org.apache.spark.sql.cassandra")\
        .option("table", my_table)\
        .option("keyspace",my_keyspace)\
        .load()

问题是我的数据增长很快,我想每天重复etl过程,从cassandra读取新添加的行,并将它们保存到新的parquet文件中。
有没有在我的Cassandra表没有秩序,我将无法阅读的基础上的时间,有没有任何方法来做它从Spark方面呢?

2nbm6dog

2nbm6dog1#

只有当您有基于时间的第一个聚类列时,基于时间的有效过滤才是可能的,如下所示:

create table test.test (
  pk1 <type>,
  pk2 <type>,
  cl1 timestamp,
  cl2 ...,
  primary key ((pk1, pk2), cl1, cl2));

在这种情况下,条件是 cl1 ,如下所示:

import org.apache.spark.sql.cassandra._
val data = { spark.read.cassandraFormat("test", "test").load()}
val filtered = data.filter("cl1 >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")

将有效地推入cassandra,并且过滤将发生在服务器端,只检索必要的数据-这很容易用explain检查-它应该生成这样的内容(pushed filter表示为 * ):

// *Filter ((cl1#23 >= 1552228894373000))
// +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk1#21,pk2#22L,cl1#23,...] 
PushedFilters: [*GreaterThanOrEqual(cl1,2019-03-10 14:41:34.373)], 
ReadSchema: struct<pk1:int,pk2:int,cl1:timestamp,...

在所有其他情况下,过滤将发生在spark端,从cassandra检索所有数据。

相关问题