Spark流,java.lang.NoClassDefFoundError:组织/Apache/Spark/SQL/avro/函数$

q35jwt9p  于 2023-03-19  发布在  Apache
关注(0)|答案(2)|浏览(181)

我正在尝试从Kafka流读取数据,它使用avro序列化的值.我没有问题阅读数据,并反序列化键是字符串,但当我尝试到反序列化数据使用from_avro函数我得到异常

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/functions$
    at DataFrameExample$.main(DataFrameExample.scala:41)
    at DataFrameExample.main(DataFrameExample.scala)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:578)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.functions$
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 12 more

Sbt文件:

name := "test"
organization := "com.databricks"
version := "1"
scalaVersion := "2.12.17"
// Spark Information
val sparkVersion = "3.3.0"
// allows us to include spark packages
resolvers += "bintray-spark-packages" at
  "https://dl.bintray.com/spark-packages/maven/"
resolvers += "Typesafe Simple Repository" at
  "https://repo.typesafe.com/typesafe/simple/maven-releases/"
resolvers += "MavenRepository" at
  "https://mvnrepository.com/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-avro" % sparkVersion % Provided
)

代码:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import java.nio.file.{Files, Paths}

object DataFrameExample extends Serializable {
  def main(args: Array[String]) = {
    val spark = SparkSession
      .builder()
      .appName("Spark Example")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN");

    val currentDirectory = new java.io.File(".").getCanonicalPath
    println(currentDirectory);

    val df = spark.readStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "localhost:9091,localhost:9093,localhost:9094"
      )
      .option("startingOffsets", "latest")
      .option("subscribe", "test-person-activity-partitions-replication-qwe")
      .load()

    val jsonFormatSchema = new String(
      Files.readAllBytes(
        Paths.get("./src/main/resources/avro/person-activity.avsc")
      )
    )

    val df2 = df.select(
      df.col("key").cast("string"),
      from_avro($"value", jsonFormatSchema).as("value")
    )

    df2.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }
}

当使用spark submit运行应用程序时,我还提供了spark_sql_kafka_0-10,由于我得到了一些其他错误,因此无法从sbt提供,这与此问题没有直接关系,但如果有人知道原因,也可以随时回答。

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class DataFrameExample --master local target/scala-2.12/test_2.12-1.jar

对不起,如果这是重复的,但我已经看了每一个答案o所以和其他地方时,搜索类似的错误。

hgb9j2n6

hgb9j2n61#

如文档所述,在命令行添加包。确保版本值正确。

spark-submit --packages "org.apache.spark:spark-avro_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0" \
             --class DataFrameExample \
             --master local target/scala-2.12/test_2.12-1.jar
pod7payv

pod7payv2#

您在Sbt文件中指定了依赖项,但您提到它是提供的:"org.apache.spark" %% "spark-avro" % sparkVersion % Provided
这意味着默认情况下,这个依赖项被排除在类路径之外,因此,它不会包含在fat jar工件中,这就是为什么当您运行spark-submit命令时,它找不到那个依赖项。
要解决这个问题,您有两个选项:

  • 使用--packages选项将jar传递给spark-submit命令:

--packages "org.apache.spark:spark-avro_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"

  • 在编译时更改依赖项的范围:

"org.apache.spark" %% "spark-avro" % sparkVersion % Compile

相关问题