结构化流kafka spark java.lang.noclassdeffounderror:org/apache/spark/internal/logging

nkoocmlb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(542)

我发现很多关于这个问题的常见问题,但都不管用。
我是一个java和bigdata的新手,java依赖管理对我来说很糟糕,如果第三方库什么都不告诉你的话,你得猜应该使用哪个包和版本,哪个包会冲突
我想从kafka主题解析json数据并保存到hbase。
主代码

package com.yizhisec.bigdata;

import com.yizhisec.bigdata.model.Traffic;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.io.IOException;
import java.util.Properties;

public class KafkaStructStream {

    private Dataset<Row> initStructKafka() throws IOException {
        Properties kafkaProp = Config.getProp();
        SparkSession spark = SparkSession
                .builder()
                .appName("Kafka")
                .master("local[2]")
                .getOrCreate();
        return spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", kafkaProp.getProperty("kafka.broker.list"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.truststore.location", Config.getPath(Config.KAFKA_JKS))
                .option("kafka.ssl.truststore.password", kafkaProp.getProperty("kafka.jks.passwd"))
                .option("startingOffsets", "latest")
                .option("subscribe", kafkaProp.getProperty("kafka.topic"))
                .load();
    }

    private void run() {
        Dataset<Row> df = null;
        try {
            df = initStructKafka();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        df.printSchema();
        Dataset<Traffic> ds = df.as(ExpressionEncoder.javaBean(Traffic.class));

        StreamingQuery query = ds.writeStream().foreach(new ForeachWriter<Traffic>() {
            @Override
            public boolean open(long partitionId, long epochId) {
                return false;
            }

            @Override
            public void process(Traffic value) {
                System.out.println(value);
            }

            @Override
            public void close(Throwable errorOrNull) {

            }
        }).start();

//        StreamingQuery query = ds.writeStream().format("console")
//                .trigger(Trigger.Continuous("2 seconds"))
//                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        KafkaStructStream k = new KafkaStructStream();
        k.run();
    }

}

交通等级

public class Traffic {
    private Long guid;
    private int time;
    private int end_time;
    private String srcip;
    private String srcmac;
    private int srcport;
    private String destip;
    private String destmac;
    private int destport;
    private String proto;
    private String appproto;
    private Long upsize;
    private Long downsize;

    getter and setter
}

附属国

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <!--        kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>2.4.4</version>
            <scope>provided</scope>
        </dependency>
</dependencies>

错误

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.yizhisec.bigdata.KafkaStructStream.initStructKafka(KafkaStructStream.java:20)
    at com.yizhisec.bigdata.KafkaStructStream.run(KafkaStructStream.java:37)
    at com.yizhisec.bigdata.KafkaStructStream.main(KafkaStructStream.java:76)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 15 more

解决方案

经过一番尝试。最后,我找到了解决办法。一个愚蠢的违约选择浪费了我一天的时间。我太蠢了,发现类已经存在,但由于提供了作用域,导入在运行时失败

roejwanj

roejwanj1#

您必须猜测应该使用哪个包和版本
不是真的猜。。。Spark2.4.x是用Scala2.12构建的,这是有文档记录的。你的pom上写着scala 2.11.x
您还应该删除 spark-streaming-kafka_2.11 和kafka依赖关系,因为您使用的是结构化流,这需要 sql-kafka 一个,但是没有提供,所以删除scope标记
如果你总是用 <version>${spark.version}</version> ,那你就不用猜了
旁注:有spark hbase库,所以您不需要编写自己的foreach writer

相关问题