使用spark elasticsearch连接器可以直接从es向spark加载所需的列。然而,似乎不存在这样一个直接的选项来做同样的,使用SparkCassandra连接器
将es中的数据读取到spark中--此处仅将es中所需的列带到spark中:
spark.conf.set('es.nodes', ",".join(ES_CLUSTER))
es_epf_df = spark.read.format("org.elasticsearch.spark.sql") \
.option("es.read.field.include", "id_,employee_name") \
.load("employee_0001") \
将数据从cassandra读入spark——这里所有列的数据都被带到spark,然后应用select来拉取感兴趣的列:
spark.conf.set('spark.cassandra.connection.host', ','.join(CASSANDRA_CLUSTER))
cass_epf_df = spark.read.format('org.apache.spark.sql.cassandra') \
.options(keyspace="db_0001", table="employee") \
.load() \
.select("id_", "employee_name")
对Cassandra也可以这样吗?如果是,那么怎么做。如果没有,那为什么不呢。
2条答案
按热度按时间a0x5cqrl1#
你写的代码已经在这么做了。您在加载后编写了select,您可能认为首先会拉取所有列,然后过滤所选列,但事实并非如此。
假设:
select * from db_0001.employee;
实际值:select id_, employee_name from db_0001.employee;
spark将理解您需要的列,并只查询cassandra数据库中的列。此功能称为 predicate 下推。这不仅仅局限于cassandra,很多源代码都支持这个特性(这是spark的特性,而不是cassandra)。更多信息:https://docs.datastax.com/en/dse/6.7/dse-dev/datastax_enterprise/spark/sparkpredicatepushdown.html
acruukt92#
实际上,连接器应该自己做,而不需要显式设置任何东西,它被称为“ predicate 下推”,cassandra连接器就是这样做的,根据文档:
连接器将自动将所有有效 predicate 下推到cassandra。数据源还将自动仅从cassandra中选择完成查询所需的列。这可以通过explain命令进行监视。
资料来源:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md