我发现很多关于这个问题的常见问题,但都不管用。
我是一个java和bigdata的新手,java依赖管理对我来说很糟糕,如果第三方库什么都不告诉你的话,你得猜应该使用哪个包和版本,哪个包会冲突
我想从kafka主题解析json数据并保存到hbase。
主代码
package com.yizhisec.bigdata;
import com.yizhisec.bigdata.model.Traffic;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.io.IOException;
import java.util.Properties;
public class KafkaStructStream {
private Dataset<Row> initStructKafka() throws IOException {
Properties kafkaProp = Config.getProp();
SparkSession spark = SparkSession
.builder()
.appName("Kafka")
.master("local[2]")
.getOrCreate();
return spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", kafkaProp.getProperty("kafka.broker.list"))
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", Config.getPath(Config.KAFKA_JKS))
.option("kafka.ssl.truststore.password", kafkaProp.getProperty("kafka.jks.passwd"))
.option("startingOffsets", "latest")
.option("subscribe", kafkaProp.getProperty("kafka.topic"))
.load();
}
private void run() {
Dataset<Row> df = null;
try {
df = initStructKafka();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
df.printSchema();
Dataset<Traffic> ds = df.as(ExpressionEncoder.javaBean(Traffic.class));
StreamingQuery query = ds.writeStream().foreach(new ForeachWriter<Traffic>() {
@Override
public boolean open(long partitionId, long epochId) {
return false;
}
@Override
public void process(Traffic value) {
System.out.println(value);
}
@Override
public void close(Throwable errorOrNull) {
}
}).start();
// StreamingQuery query = ds.writeStream().format("console")
// .trigger(Trigger.Continuous("2 seconds"))
// .start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
KafkaStructStream k = new KafkaStructStream();
k.run();
}
}
交通等级
public class Traffic {
private Long guid;
private int time;
private int end_time;
private String srcip;
private String srcmac;
private int srcport;
private String destip;
private String destmac;
private int destport;
private String proto;
private String appproto;
private Long upsize;
private Long downsize;
getter and setter
}
附属国
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.4.4</spark.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
错误
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.yizhisec.bigdata.KafkaStructStream.initStructKafka(KafkaStructStream.java:20)
at com.yizhisec.bigdata.KafkaStructStream.run(KafkaStructStream.java:37)
at com.yizhisec.bigdata.KafkaStructStream.main(KafkaStructStream.java:76)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 15 more
解决方案
经过一番尝试。最后,我找到了解决办法。一个愚蠢的违约选择浪费了我一天的时间。我太蠢了,发现类已经存在,但由于提供了作用域,导入在运行时失败
1条答案
按热度按时间roejwanj1#
您必须猜测应该使用哪个包和版本
不是真的猜。。。Spark2.4.x是用Scala2.12构建的,这是有文档记录的。你的pom上写着scala 2.11.x
您还应该删除
spark-streaming-kafka_2.11
和kafka依赖关系,因为您使用的是结构化流,这需要sql-kafka
一个,但是没有提供,所以删除scope标记如果你总是用
<version>${spark.version}</version>
,那你就不用猜了旁注:有spark hbase库,所以您不需要编写自己的foreach writer