我正在尝试从Kafka流读取数据,它使用avro序列化的值.我没有问题阅读数据,并反序列化键是字符串,但当我尝试到反序列化数据使用from_avro函数我得到异常
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/functions$
at DataFrameExample$.main(DataFrameExample.scala:41)
at DataFrameExample.main(DataFrameExample.scala)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.functions$
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 12 more
Sbt文件:
name := "test"
organization := "com.databricks"
version := "1"
scalaVersion := "2.12.17"
// Spark Information
val sparkVersion = "3.3.0"
// allows us to include spark packages
resolvers += "bintray-spark-packages" at
"https://dl.bintray.com/spark-packages/maven/"
resolvers += "Typesafe Simple Repository" at
"https://repo.typesafe.com/typesafe/simple/maven-releases/"
resolvers += "MavenRepository" at
"https://mvnrepository.com/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-avro" % sparkVersion % Provided
)
代码:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import java.nio.file.{Files, Paths}
object DataFrameExample extends Serializable {
def main(args: Array[String]) = {
val spark = SparkSession
.builder()
.appName("Spark Example")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN");
val currentDirectory = new java.io.File(".").getCanonicalPath
println(currentDirectory);
val df = spark.readStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"localhost:9091,localhost:9093,localhost:9094"
)
.option("startingOffsets", "latest")
.option("subscribe", "test-person-activity-partitions-replication-qwe")
.load()
val jsonFormatSchema = new String(
Files.readAllBytes(
Paths.get("./src/main/resources/avro/person-activity.avsc")
)
)
val df2 = df.select(
df.col("key").cast("string"),
from_avro($"value", jsonFormatSchema).as("value")
)
df2.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
}
}
当使用spark submit运行应用程序时,我还提供了spark_sql_kafka_0-10,由于我得到了一些其他错误,因此无法从sbt提供,这与此问题没有直接关系,但如果有人知道原因,也可以随时回答。
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class DataFrameExample --master local target/scala-2.12/test_2.12-1.jar
对不起,如果这是重复的,但我已经看了每一个答案o所以和其他地方时,搜索类似的错误。
2条答案
按热度按时间hgb9j2n61#
如文档所述,在命令行添加包。确保版本值正确。
pod7payv2#
您在Sbt文件中指定了依赖项,但您提到它是提供的:
"org.apache.spark" %% "spark-avro" % sparkVersion % Provided
这意味着默认情况下,这个依赖项被排除在类路径之外,因此,它不会包含在fat jar工件中,这就是为什么当您运行
spark-submit
命令时,它找不到那个依赖项。要解决这个问题,您有两个选项:
--packages
选项将jar传递给spark-submit
命令:--packages "org.apache.spark:spark-avro_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
"org.apache.spark" %% "spark-avro" % sparkVersion % Compile