如何使用pyspark执行cql查询

ix0qys7i  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(430)

我想使用pyspark执行cassandra cql查询,但是我没有找到执行它的方法,我可以将整个表加载到dataframe并创建tempview并查询它。

df = spark.read.format("org.apache.spark.sql.cassandra").
        options(table="country_production2",keyspace="country").load()
df.createOrReplaceTempView("Test")

请建议更好的方法,以便我可以在pyspark中执行cql查询。

yhuiod9q

yhuiod9q1#

Spark SQL 不支持Cassandra的 cql 直接说方言。它只允许您将表作为Dataframe加载并对其进行操作。
如果您关心的是读取整个表来查询它,那么您可以使用下面给出的过滤器,让spark推动 predicate ,只加载您需要的数据。

from pyspark.sql.functions import *

df = spark.read\
          .format("org.apache.spark.sql.cassandra")\
          .options(table=table_name, keyspace=keys_space_name)\
          .load()\
          .filter(col("id")=="A")

df.createOrReplaceTempView("Test")
vyu0f0g1

vyu0f0g12#

在pyspark中,您使用的是sql,而不是cql。如果sql查询以某种方式与cql匹配,即您是按分区或主键进行查询,那么spark cassandra connector(scc)将把查询转换成该cql,并执行(所谓的 predicate 下推)。如果不匹配,则spark将通过scc加载所有数据,并在spark级别执行过滤。
因此,注册临时视图后,可以执行以下操作:

val result = spark.sql("select ... from Test where ...")

并在 result 变量。要检查是否发生了 predicate 下推,请执行 result.explain() ,并检查 * 条件中的标记 PushedFilters 部分。

相关问题