spark.setCassandraConf("Test Cluster", CassandraConnectorConf.ConnectionHostParam.option("localhost"))
val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "books_ks", "table" -> "books")).load()
val dfToDelete = df.filter($"price" < 3).select($"price");
dfToDelete.show();
// import for C* client
import com.datastax.driver.core._
// build a C* client (part of the dependency of the scala driver)
val clusterBuilder = Cluster.builder().addContactPoints("127.0.0.1");
val cluster = clusterBuilder.build();
val session = cluster.connect();
// loop over everything that you filtered in the DF and delete specified row.
for(price <- dfToDelete.collect())
session.execute("DELETE FROM books_ks.books WHERE price=" + price.get(0).toString);
1条答案
按热度按时间f4t66c6m1#
您不能通过dfapi删除,而且通过rddapi删除是不自然的。rdd和dfs是不可变的,这意味着没有修改。您可以过滤它们以减少它们,但这会生成一个新的rdd/df。
已经说过,您可以做的是过滤掉要删除的行,然后构建一个c客户机来执行删除:
//spark和c连接导入org.apache.spark.sql.cassandra.\uimport com.datastax.spark.connector.cql.cassandraconnectorconf
很少有警告,如果您试图删除一大部分行,这将不起作用。在这里使用collect意味着这项工作将在spark的驱动程序aka spof&bottleneck中完成。
更好的方法是定义一个df udf来执行删除,这样做的好处是你可以得到并行化。选项b)到rdd级别,只需删除上面显示的内容。
这个故事的寓意,仅仅因为它可以做到,并不意味着它应该做到。