kafka:=线程“main”org.apache.spark.sql.streaming.streamingqueryexception中的异常:未找到连接项

xnifntxz  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(366)

运行Kafka代码时
1) 错误streamexecution:查询[id=c6426655-446f-4306-91ba-d78e68e05c15,runid=420382c1-8558-45a1-b26d-f6299044fa04]终止,出现错误java.lang.exceptionininitializererror
2) 线程“stream execution thread for[id=c6426655-446f-4306-91ba-d78e68e05c15,runid=420382c1-8558-45a1-b26d-f6299044fa04]”java.lang.exceptionininitializererror中出现异常
3) 线程“main”org.apache.spark.sql.streaming.streamingqueryexception中出现异常:null
sbt相关性
// https://mvnrepository.com/artifact/org.apache.spark/spark-core librarydependencies+=“org.apache.spark”%%“spark核心”%”“2.2.3”
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql librarydependencies+=“org.apache.spark”%%“spark sql”%”“2.2.3”
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming librarydependencies+=“org.apache.spark”%%“spark streaming”%”“2.2.3”%”“提供”
// https://mvnrepository.com/artifact/org.apache.kafka/kafka librarydependencies+=“org.apache.kafka”%%“kafka”%”“2.1.1”
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients librarydependencies+=“org.apache.kafka”%“kafka客户端”%“2.1.1”
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams librarydependencies+=“org.apache.kafka”%“kafka流”%“2.1.1”
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 librarydependencies+=“org.apache.spark”%%“spark-sql-kafka-0-10”%”“2.2.3”
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-scala librarydependencies+=“org.apache.kafka”%%“kafka流scala”%”“2.1.1”

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession

object demo1 {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir","c:\\hadoop\\")

    val spark: SparkSession = SparkSession.builder
      .appName("My Spark Application")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
      .config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
      .getOrCreate

    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark,sqlshuffle.partations","2")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "162.244.80.189:9092")
      .option("startingOffsets", "earliest")
      .option("group.id","test1")
      .option("subscribe", "demo11")
      .load()

    import spark.implicits._

    val dsStruc = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp").as[(String, String, Timestamp)]

    val abc = df.writeStream
    .outputMode("append")
    .format("console")
    .start().awaitTermination()

    df.show()
roejwanj

roejwanj1#

我也有同样的问题。我使用了错误的spark sql kafka库版本(2.2.0而不是2.3.0)。我成功的配置是:
org.apache.spark spark-core_2.11 2.3.0提供
org.apache.spark spark-sql\ 2.11 2.3.0
org.apache.spark spark-sql-kafka-0-10\ 2.11 2.3.0
org.apache.kafkaKafka客户端0.10.1.0
希望对你有帮助。这篇文章给了我灵感
https://community.hortonworks.com/content/supportkb/222428/error-microbatchexecution-query-id-567e4e77-9457-4.html

相关问题