kafka+spark错误microbatchexecution:查询

xe55xuns  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(206)

我正在尝试运行这个ibm开发人员代码模式中指定的程序。目前,我只做本地部署https://github.com/ibm/kafka-streaming-click-analysis?cm_sp=developer--determine-trending-topics-with-clickstream-analysis--get-the-code
因为它有点旧,我的kafka和scala版本并不完全符合代码模式的要求。我使用的版本有:
斯卡拉:2.4.6Kafka0.10.2.1
在最后一步,我得到以下错误:

ERROR MicroBatchExecution: Query [id = f4dfe12f-1c99-427e-9f75-91a77f6e51a7, 
runId = c9744709-2484-4ea1-9bab-28e7d0f6b511] terminated with error
org.apache.spark.sql.catalyst.errors.package$TreeNodeException

以及执行树
我下面的步骤如下:

1. Start Zookeeper
2. Start Kafka
3. cd kafka_2.10-0.10.2.1
4. tail -200 data/2017_01_en_clickstream.tsv | bin/kafka-console-producer.sh --broker-list ip:port --topic clicks --producer.config=config/producer.properties

我已经下载了数据集并将其存储在kafkaèu2.10-0.10.2.1目录中名为data的目录中

cd $SPARK_DIR
bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6

由于在spark安装期间没有设置spark目录,因此我正在导航包含spark的目录以运行此命令

scala> import scala.util.Try

scala> case class Click(prev: String, curr: String, link: String, n: Long)

scala> def parseVal(x: Array[Byte]): Option[Click] = {
    val split: Array[String] = new Predef.String(x).split("\\t")
    if (split.length == 4) {
      Try(Click(split(0), split(1), split(2), split(3).toLong)).toOption
    } else
      None
  }

scala> val records = spark.readStream.format("kafka")
                  .option("subscribe", "clicks")
                  .option("failOnDataLoss", "false")
                  .option("kafka.bootstrap.servers", "localhost:9092").load()

scala>
val messages = records.select("value").as[Array[Byte]]
                 .flatMap(x => parseVal(x))
                 .groupBy("curr")
                 .agg(Map("n" -> "sum"))
                 .sort($"sum(n)".desc)
val query = messages.writeStream
          .outputMode("complete")
          .option("truncate", "false")
          .format("console")
          .start()

最后一条语句query=。。。正在给出上述错误。任何帮助都将不胜感激。提前谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题