我正在尝试通过Azure DBR作业(Spark流)将数据写入Cassandra表(cosmos DB)。
Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`
`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`
我做了什么才来到这里:
- 已创建cosmos DB帐户
- 创建了Cassandra密钥空间
- 已创建Cassandra表
- 已创建DBR作业
- 已将com. datastax. spark:spark-cassandra-connector-assembly_2. 12:3.2.0添加到作业簇
- 添加com. microsoft. azure. cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0到作业集群
我尝试了:
不同版本的连接器或azure cosmos db帮助程序库,但出现某些ClassNotFoundExceptions或MethodNotFound错误
代码片段:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime
def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
spark.conf.set("spark.cassandra.output.ignoreNulls","true")
spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
// spark.cassandra.auth.username and password are set in cluster conf
val write=dataFrame.writeStream.
format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "****", "keyspace" -> "****")).
foreachBatch(upsertToDelta _).
outputMode("update").
option("mergeSchema", "true").
option("mode","PERMISSIVE").
option("checkpointLocation", checkpointLocation).
start()
write.awaitTermination()
}
def upsertToDelta(newBatch: DataFrame, batchId: Long) {
try {
val spark = SparkSession.active
println(LocalDateTime.now())
println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
newBatch.persist()
val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
deDup.write
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "****", "keyspace" -> "****"))
.mode("append")
.save()
newBatch.unpersist()
} catch {
case e: Exception =>
throw e
}
}
############################
实施@theo-van-kraay建议的解决方案后,执行器日志中出现错误(即使出现此错误,作业仍继续运行)
23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
at scala.Option.orElse(Option.scala:447)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.flatMap(Option.scala:271)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.orElse(Option.scala:447)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.flatMap(Option.scala:271)
...
...
23/02/13 07:28:56 ERROR Utils: Aborting task
1条答案
按热度按时间wlzqhblo1#
您可以删除:
它不是Spark 3 Cassandra连接器所必需的,仅为Spark 2创建。同时删除代码中对它的引用。