数据源org.apache.spark.sql.cassandra不支持流式读取
val spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.config("spark.cassandra.auth.username", "xxxxx")
.config("spark.cassandra.auth.password", "yyyyy")
.master("local[*]")
.getOrCreate();
val tableDf3 = spark.**readStream**
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "aaaaa", "keyspace" -> "bbbbb"))
.load()
.filter("deviceid='XYZ'")
tableDf3.show(10)
1条答案
按热度按时间ngynwnxp1#
正确-spark cassandra连接器只能用作流接收器,不能用作流源。
如果您想从cassandra获得更改,那么这是一个相当复杂的任务,这取决于cassandra的版本(它是否实现了cdc)和其他因素。
对于spark,可以通过定期重新读取数据来实现某种流,使用timestamp列过滤掉已经读取的数据。您可以在下面的答案中找到有关该方法的更多信息。