从spark中删除cassandra中的特定列

n6lpvg4x  于 2021-06-15  发布在  Cassandra
关注(0)|答案(1)|浏览(450)

我可以用RDDAPI删除特定的列-

sc.cassandraTable("books_ks", "books")
  .deleteFromCassandra("books_ks", "books",SomeColumns("book_price"))

我很难用dataframeapi做到这一点。
有人能举个例子吗?

f4t66c6m

f4t66c6m1#

您不能通过dfapi删除,而且通过rddapi删除是不自然的。rdd和dfs是不可变的,这意味着没有修改。您可以过滤它们以减少它们,但这会生成一个新的rdd/df。
已经说过,您可以做的是过滤掉要删除的行,然后构建一个c客户机来执行删除:
//spark和c
连接导入org.apache.spark.sql.cassandra.\uimport com.datastax.spark.connector.cql.cassandraconnectorconf

spark.setCassandraConf("Test Cluster", CassandraConnectorConf.ConnectionHostParam.option("localhost"))
val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "books_ks", "table" -> "books")).load()
val dfToDelete = df.filter($"price" < 3).select($"price");
dfToDelete.show();

// import for C* client
import com.datastax.driver.core._

// build a C* client (part of the dependency of the scala driver)
val clusterBuilder = Cluster.builder().addContactPoints("127.0.0.1");
val cluster  = clusterBuilder.build();
val session = cluster.connect();

// loop over everything that you filtered in the DF and delete specified row.
for(price <- dfToDelete.collect())
    session.execute("DELETE FROM books_ks.books WHERE price=" + price.get(0).toString);

很少有警告,如果您试图删除一大部分行,这将不起作用。在这里使用collect意味着这项工作将在spark的驱动程序aka spof&bottleneck中完成。
更好的方法是定义一个df udf来执行删除,这样做的好处是你可以得到并行化。选项b)到rdd级别,只需删除上面显示的内容。
这个故事的寓意,仅仅因为它可以做到,并不意味着它应该做到。

相关问题