我试着从Kafka加载数据,这是成功的,但我无法转换为Sparkrdd,
val kafkaParams = Map("metadata.broker.list" -> "IP:6667,IP:6667")
val offsetRanges = Array(
OffsetRange("first_topic", 0,1,1000)
)
val ssc = new StreamingContext(new SparkConf, Seconds(60))
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
现在我如何读取这个流对象???我的意思是把它转换成sparkDataframe并执行一些计算
我试着转换成Dataframe
stream.foreachRDD { rdd =>
println("Hello")
import sqlContext.implicits._
val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDf()
}
但是todf不是工作错误:值todf不是org.apache.spark.rdd.rdd[org.apache.spark.sql.row]的成员
2条答案
按热度按时间hmmo2u0o1#
它很旧,但我认为您在从行创建df时忘记了添加模式:
(在Spark壳2.2.0中测试)
yyyllmsg2#