spark流数据集cassandra连接不支持操作检查器

92dk7w1h  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(439)

我正在尝试将我的流数据集写入cassandra。
我有以下类的流数据集;

case class UserSession(var id: Int,
                       var visited: List[String]
                      )

我在cassandra中还有下面的键空间/表(blog=键空间,session=表

CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy',    'replication_factor' : 1 };

CREATE TABLE blog.session(id int PRIMARY KEY, visited list<text>);

我选择了 list<text> 因为我的访问类型是 List<String> 我的作者如下

class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {

/*
  - on every batch, on every partition `partitionId`
    - on every "epoch" = chunk of data
      - call the open method; if false, skip this chunk
      - for each entry in this chunk, call the process method
      - call the close method either at the end of the chunk or with an error if it was thrown
 */

val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)

override def open(partitionId: Long, epochId: Long): Boolean = {
  println("Open connection")
  true
}

override def process(sess: UserSession): Unit = {
  connector.withSessionDo { session =>
    session.execute(
      s"""
         |insert into $keyspace.$table("id")
         |values (${sess.id},${sess.visited})
       """.stripMargin)
  }
}

override def close(errorOrNull: Throwable): Unit = println("Closing connection")

 }

查看我的进程函数可能会有所帮助,因为这可能会引发错误。我的主要观点如下。
finishedusersessionsstream:数据集[usersession]

def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....

finishedUserSessionsStream.writeStream
      .option("checkpointLocation", "checkpoint")
      .foreach(new SessionCassandraForeachWriter)
      .start()
      .awaitTermination()

}

这给了我以下错误
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)

mlmc2os5

mlmc2os51#

对于spark 3.0和spark cassandra connector 3.0.0,您不应使用 foreach -这是scc<2.5.0的一个解决方案,它不支持编写流数据集。从SCC2.5.0开始,您可以直接将数据写入cassandra,如下所示(下面是完整的示例):

val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .format("org.apache.spark.sql.cassandra")
      .option("checkpointLocation", "checkpoint")
      .option("keyspace", "ks")
      .option("table", "table")
      .start()

您还需要切换到scc3.0.0-beta版本,该版本包含许多修复程序。

相关问题