df.writeStream
.foreachBatch((batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("org.apache.spark.sql.cassandra")
.cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
.mode("append")
.save())
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
在代码的最后,dataframe被写入cassandra表。
通过ui检查最后一个阶段后,没有执行save/append data的部分。
我想知道为什么它不存在或者是我错过了什么。
================================更改我的代码后===========================
.writeStream
// .foreachBatch((batchDF: DataFrame, batchId: Long) =>
// batchDF.write
// .format("org.apache.spark.sql.cassandra")
// .cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
// .mode("append")
// .save())
.cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
但我可以在sql选项卡中看到writetodatasourcev2。
1条答案
按热度按时间nszi6y051#
也许它没有直接回答你的问题,但是对于spark 3.0和scc 3.0.0(你应该使用3.0.0-beta),你不应该使用foreachbatch,但只需通过指定cassandra格式按原样写入数据—因为scc 2.5.0 spark structured streaming本机支持—请参阅公告:https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all