我用Kafka和spark来统计twitter上最流行的标签。这是我要运行的scala对象:
package spark.example
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
/**
* A Spark Streaming - Kafka integration to receive twitter
* data from kafka topic and find the popular hashtags
*
* Arguments: <zkQuorum> <consumer-group> <topics> <numThreads>
* <zkQuorum> - The zookeeper hostname
* <consumer-group> - The Kafka consumer group
* <topics> - The kafka topic to subscribe to
* <numThreads> - Number of kafka receivers to run in parallel
*
* More discussion at stdatalabs.blogspot.com
*
* @author Sachin Thirumala
*/
object KafkaSparkPopularHashTags {
val conf = new SparkConf().setMaster("local[6]").setAppName("Spark Streaming - Kafka Producer - PopularHashTags").set("spark.executor.memory", "1g")
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
//sc.setLogLevel("WARN")
// Create an array of arguments: zookeeper hostname/ip,consumer group, topicname, num of threads
val Array(zkQuorum, group, topics, numThreads) = args
// Set the Spark StreamingContext to create a DStream for every 2 seconds
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
// Map each topic to a thread
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Map value from the kafka message (k, v) pair
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// Filter hashtags
val hashTags = lines.flatMap(_.split(" ")).filter(_.startsWith("#"))
// Get the top hashtags over the previous 60/10 sec window
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map { case (topic, count) => (count, topic) }
lines.print()
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
lines.count().map(cnt => "Received " + cnt + " kafka messages.").print()
ssc.start()
ssc.awaitTermination()
}
}
但每次我尝试用以下参数运行代码时:localhost:2181 spark-streaming-consumer-group tweets 2(主题“tweets”已经创建,twitterproducer正在运行)我得到以下错误:
Exception in thread "main" java.lang.InstantiationException: org.apache.spark.util.SystemClock
at java.lang.Class.newInstance(Class.java:427)
at org.apache.spark.streaming.scheduler.JobGenerator.liftedTree1$1(JobGenerator.scala:52)
at org.apache.spark.streaming.scheduler.JobGenerator.<init>(JobGenerator.scala:51)
at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:54)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75)
at spark.example.KafkaSparkPopularHashTags$.main(KafkaSparkPopularHashTags.scala:48)
at spark.example.KafkaSparkPopularHashTags.main(KafkaSparkPopularHashTags.scala)
Caused by: java.lang.NoSuchMethodException: org.apache.spark.util.SystemClock.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
错误表明第48行有问题,即:
val ssc = new StreamingContext(sc, Seconds(2))
系统似乎无法示例化对象。有什么解决这个问题的建议吗?其他信息:我正在使用Scala2.12,我已经尝试降级到Scala2.11甚至2.10。我试图重现这个实验:http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html
1条答案
按热度按时间bkkx9g8r1#
[更新]经过多次尝试,我发现我必须使用scala 2.11.1版,感谢您的宝贵帮助