使用Azure数据块作业连接到Azure Cosmos DB Cassandra API表时出现抽象方法异常

az31mfrm  于 2023-02-13  发布在  Cassandra
关注(0)|答案(1)|浏览(161)

我正在尝试通过Azure DBR作业(Spark流)将数据写入Cassandra表(cosmos DB)。

Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

我做了什么才来到这里:

  • 已创建cosmos DB帐户
  • 创建了Cassandra密钥空间
  • 已创建Cassandra表
  • 已创建DBR作业
  • 已将com. datastax. spark:spark-cassandra-connector-assembly_2. 12:3.2.0添加到作业簇
  • 添加com. microsoft. azure. cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0到作业集群

我尝试了:
不同版本的连接器或azure cosmos db帮助程序库,但出现某些ClassNotFoundExceptions或MethodNotFound错误
代码片段:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime

def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
    spark.conf.set("spark.cassandra.output.ignoreNulls","true")
    spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
    spark.conf.set("spark.cassandra.connection.port", "10350")
    spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
    // spark.cassandra.auth.username and password are set in cluster conf
    
    val write=dataFrame.writeStream.
              format("org.apache.spark.sql.cassandra").
              options(Map( "table" -> "****", "keyspace" -> "****")).
              foreachBatch(upsertToDelta _).
              outputMode("update").
              option("mergeSchema", "true").
              option("mode","PERMISSIVE").
              option("checkpointLocation", checkpointLocation).
              start()
            write.awaitTermination()
}

  def upsertToDelta(newBatch: DataFrame, batchId: Long) {

    try {
      val spark = SparkSession.active
      println(LocalDateTime.now())
      println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
      newBatch.persist()
      val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
      val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
    
      deDup.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "****", "keyspace" -> "****"))
        .mode("append")
        .save()

      newBatch.unpersist()
    } catch {
      case e: Exception =>
        throw e
    }
  }

############################

实施@theo-van-kraay建议的解决方案后,执行器日志中出现错误(即使出现此错误,作业仍继续运行)

23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    ...
    ...

23/02/13 07:28:56 ERROR Utils: Aborting task
wlzqhblo

wlzqhblo1#

您可以删除:

com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0

它不是Spark 3 Cassandra连接器所必需的,仅为Spark 2创建。同时删除代码中对它的引用。

相关问题