我正在尝试运行这个ibm开发人员代码模式中指定的程序。目前,我只做本地部署https://github.com/ibm/kafka-streaming-click-analysis?cm_sp=developer--determine-trending-topics-with-clickstream-analysis--get-the-code
因为它有点旧,我的kafka和scala版本并不完全符合代码模式的要求。我使用的版本有:
斯卡拉:2.4.6Kafka0.10.2.1
在最后一步,我得到以下错误:
ERROR MicroBatchExecution: Query [id = f4dfe12f-1c99-427e-9f75-91a77f6e51a7,
runId = c9744709-2484-4ea1-9bab-28e7d0f6b511] terminated with error
org.apache.spark.sql.catalyst.errors.package$TreeNodeException
以及执行树
我下面的步骤如下:
1. Start Zookeeper
2. Start Kafka
3. cd kafka_2.10-0.10.2.1
4. tail -200 data/2017_01_en_clickstream.tsv | bin/kafka-console-producer.sh --broker-list ip:port --topic clicks --producer.config=config/producer.properties
我已经下载了数据集并将其存储在kafkaèu2.10-0.10.2.1目录中名为data的目录中
cd $SPARK_DIR
bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6
由于在spark安装期间没有设置spark目录,因此我正在导航包含spark的目录以运行此命令
scala> import scala.util.Try
scala> case class Click(prev: String, curr: String, link: String, n: Long)
scala> def parseVal(x: Array[Byte]): Option[Click] = {
val split: Array[String] = new Predef.String(x).split("\\t")
if (split.length == 4) {
Try(Click(split(0), split(1), split(2), split(3).toLong)).toOption
} else
None
}
scala> val records = spark.readStream.format("kafka")
.option("subscribe", "clicks")
.option("failOnDataLoss", "false")
.option("kafka.bootstrap.servers", "localhost:9092").load()
scala>
val messages = records.select("value").as[Array[Byte]]
.flatMap(x => parseVal(x))
.groupBy("curr")
.agg(Map("n" -> "sum"))
.sort($"sum(n)".desc)
val query = messages.writeStream
.outputMode("complete")
.option("truncate", "false")
.format("console")
.start()
最后一条语句query=。。。正在给出上述错误。任何帮助都将不胜感激。提前谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!