我正在尝试用spark构建一个etl过程。我的目标是读取Cassandra表和保存到Parquet文件。
到目前为止,我所做的是使用一个cassandra连接器(在pyspark中)从cassandra读取整个表:
df = app.sqlSparkContext.read.format("org.apache.spark.sql.cassandra")\
.option("table", my_table)\
.option("keyspace",my_keyspace)\
.load()
问题是我的数据增长很快,我想每天重复etl过程,从cassandra读取新添加的行,并将它们保存到新的parquet文件中。
有没有在我的Cassandra表没有秩序,我将无法阅读的基础上的时间,有没有任何方法来做它从Spark方面呢?
1条答案
按热度按时间2nbm6dog1#
只有当您有基于时间的第一个聚类列时,基于时间的有效过滤才是可能的,如下所示:
在这种情况下,条件是
cl1
,如下所示:将有效地推入cassandra,并且过滤将发生在服务器端,只检索必要的数据-这很容易用explain检查-它应该生成这样的内容(pushed filter表示为
*
):在所有其他情况下,过滤将发生在spark端,从cassandra检索所有数据。