尝试向集群提交spark作业,该集群从azure event hub读取二进制编码的流Dataframe,然后使用protobuf文件格式对其进行转换。
构建.sbt
import sbtassembly.AssemblyPlugin.autoImport.ShadeRule
name := "AdQualitySpark"
version := "0.1"
scalaVersion := "2.12.12"
unmanagedJars in Compile += file("lib/geneva-java-0.1.0.jar")
unmanagedJars in Compile += file("lib/bond-7.0.0-preview-2017-11-22.jar")
unmanagedJars in Compile += file("lib/azure-cosmosdb-spark_2.4.0_2.11-3.6.7-uber.jar")
resolvers += "MMLSpark Repo" at "https://mmlspark.azureedge.net/maven"
libraryDependencies ++= Seq(
"org.apache.commons" % "commons-lang3" % "3.1",
"org.apache.spark" %% "spark-core" % "3.0.0" % "provided",
"org.apache.spark" %% "spark-sql" % "3.0.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "3.0.0" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.0",
"org.apache.kafka" % "kafka-clients" % "0.8.2.1" % "provided",
"org.apache.spark" %% "spark-mllib" % "3.0.0" % "provided",
// "com.databricks" %% "spark-csv" % "1.5.0",
"org.rogach" %% "scallop" % "3.1.5",
"org.scalaj" %% "scalaj-http" % "2.4.1",
"com.microsoft.azure" %% "azure-eventhubs-spark" % "2.3.15",
"org.scalatest" %% "scalatest" % "3.0.0" % "provided",
"com.microsoft.ml.spark" %% "mmlspark" % "1.0.0-rc3-27-b1c14008-SNAPSHOT",
"com.holdenkarau" %% "spark-testing-base" % "3.0.0_1.0.0" % "provided",
"com.typesafe" % "config" % "1.4.0",
"com.thesamet.scalapb" %% "compilerplugin" % "0.9.4",
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion, //% "protobuf",
"com.google.protobuf" % "protobuf-java-util" % "3.11.1",
"com.thesamet.scalapb" %% "sparksql-scalapb" % "0.9.0",
"com.thesamet.scalapb" %% "scalapb-json4s" % "0.9.3",
"com.microsoft.azure" % "azure-data-lake-store-sdk" % "2.3.8",
"com.databricks" %% "dbutils-api" % "0.0.4",
"io.delta" %% "delta-core" % "0.7.0",
"com.microsoft.sqlserver" % "mssql-jdbc" % "8.2.1.jre8",
"org.apache.spark" %% "spark-avro" % "3.0.0"
// "com.databricks" %% "spark-avro" % "3.2.0"
)
// Needed for CosmosDB Spark connector.
dependencyOverrides ++= {
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",
"com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",
"com.google.guava" % "guava" % "15.0",
"org.json4s" %% "json4s-jackson" % "3.6.10"
)
}
/*
lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % "0.8.2.1" excludeAll(excludeJpountz) // add more exclusions here*/
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll
)
scalacOptions += "-Xmacro-settings:materialize-derivations"
javaOptions in assembly += "-Xmx2g"
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
cp filter { f =>
!(f.data.getName.contains("mml") || f.data.getName.contains("http") || f.data.getName.contains("proton")
|| f.data.getName.contains("spray") || f.data.getName.contains("scallop") || f.data.getName.contains("compat")
|| f.data.getName.contains("eventhub") || f.data.getName.contains("kafka") || f.data.getName.contains("scalapb")
|| f.data.getName.contains("compilerplugin") || f.data.getName.contains("lenses") || f.data.getName.contains("protoc")
|| f.data.getName.contains("frameless") || f.data.getName.contains("shadeproto")
|| f.data.getName.contains("geneva") || f.data.getName.contains("mssql-jdbc")|| f.data.getName().contains("spark-avro")
|| f.data.getName.contains("bond")|| f.data.getName.contains("cosmosdb") || f.data.getName.contains("delta"))
}
/*
cp foreach {f => println(f.data.getName,f.data.getAbsoluteFile().length())}
cp*/
}
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
/*
(scalastyleConfig in Test) := baseDirectory.value / "scalastyleconfig.xml"
logBuffered in Test := false
lazy val testScalastyle = taskKey[Unit]("testScalastyle")
testScalastyle := scalastyle.in(Test).toTask("").value
(test in Test) := ((test in Test) dependsOn testScalastyle).value */
lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
compileScalastyle := scalastyle.in(Compile).toTask("").value
(compile in Compile) := ((compile in Compile) dependsOn compileScalastyle).value
parallelExecution in Test := false
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
//todo:start including tests in build
logBuffered in Test := false
引发异常的源代码是:
def editorialAdStream(sc: SparkContext, spark: SparkSession): Dataset[AdEntity] = {
val ehConnectionString = ConnectionStringBuilder(ConnectionString)
.setEventHubName(EventHubName)
.build
val customEventhubParameters =
EventHubsConf(ehConnectionString)
.setConsumerGroup(ConsumerGroup)
.setEndingPosition(EventPosition.fromEndOfStream)
.setMaxEventsPerTrigger(MaxEventsPerTrigger)
.setPrefetchCount(PrefetchCount)
val binaryAdStream = spark
.readStream
.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
.options(customEventhubParameters.toMap)
.load()
.selectExpr("body")
.as(Encoders.BINARY)
val adStream: Dataset\[EditorialAdEntity\] =
binaryAdStream
.map(m => EditorialAdEntity().mergeFrom(CodedInputStream.newInstance(m)))
异常堆栈跟踪是:
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.<init>(Lorg/apache/spark/sql/types/StructType;ZLscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/reflect/ClassTag;)V
at frameless.TypedExpressionEncoder$.apply(TypedExpressionEncoder.scala:45)
at scalapb.spark.Implicits.typedEncoderToEncoder(TypedEncoders.scala:125)
at scalapb.spark.Implicits.typedEncoderToEncoder$(TypedEncoders.scala:122)
at scalapb.spark.Implicits$.typedEncoderToEncoder(TypedEncoders.scala:128)
at Utils.MessagingQueues.EventHubSourceReader$.editorialAdStream(EventHubSourceReader.scala:57)
at Utils.MessagingQueues.SourceReader$.readEditorialAdSource(SourceReader.scala:39)
at Workflows.Streaming.PROD.UnifiedNRT.UnifiedNRTHelper$.fetchInputStream(UnifiedNRTHelper.scala:62)
at Workflows.Streaming.PROD.UnifiedNRT.UnifiedNRT$.main(UnifiedNRT.scala:50)
事实:[1]项目使用的是spark 3.0[2] scala版本是2.12.12[3]根据官方文档,我尝试将scalapb libs的版本更改为不同的版本,但没有帮助:https://scalapb.github.io/docs/sparksql/#setting-启动您的项目
请帮助解决此异常。
暂无答案!
目前还没有任何答案,快来回答吧!