kafka流行标签计数

gv8xihay  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(263)

我用Kafka和spark来统计twitter上最流行的标签。这是我要运行的scala对象:

package spark.example

import java.util.HashMap

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel

/**
  * A Spark Streaming - Kafka integration to receive twitter
  * data from kafka topic and find the popular hashtags
  *
  * Arguments: <zkQuorum> <consumer-group> <topics> <numThreads>
  * <zkQuorum>       - The zookeeper hostname
  * <consumer-group> - The Kafka consumer group
  * <topics>         - The kafka topic to subscribe to
  * <numThreads>     - Number of kafka receivers to run in parallel
  *
  * More discussion at stdatalabs.blogspot.com
  *
  * @author Sachin Thirumala
  */

object KafkaSparkPopularHashTags {

  val conf = new SparkConf().setMaster("local[6]").setAppName("Spark Streaming - Kafka Producer - PopularHashTags").set("spark.executor.memory", "1g")

  conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

  val sc = new SparkContext(conf)

  def main(args: Array[String]) {

//sc.setLogLevel("WARN")

// Create an array of arguments: zookeeper hostname/ip,consumer group, topicname, num of threads
val Array(zkQuorum, group, topics, numThreads) = args

// Set the Spark StreamingContext to create a DStream for every 2 seconds
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")

// Map each topic to a thread
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Map value from the kafka message (k, v) pair
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// Filter hashtags
val hashTags = lines.flatMap(_.split(" ")).filter(_.startsWith("#"))

// Get the top hashtags over the previous 60/10 sec window
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
  .map { case (topic, count) => (count, topic) }

val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
  .map { case (topic, count) => (count, topic) }

lines.print()

// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})

topCounts10.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
  topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})

lines.count().map(cnt => "Received " + cnt + " kafka messages.").print()

ssc.start()
ssc.awaitTermination()
  }
}

但每次我尝试用以下参数运行代码时:localhost:2181 spark-streaming-consumer-group tweets 2(主题“tweets”已经创建,twitterproducer正在运行)我得到以下错误:

Exception in thread "main" java.lang.InstantiationException: org.apache.spark.util.SystemClock
at java.lang.Class.newInstance(Class.java:427)
at org.apache.spark.streaming.scheduler.JobGenerator.liftedTree1$1(JobGenerator.scala:52)
at org.apache.spark.streaming.scheduler.JobGenerator.<init>(JobGenerator.scala:51)
at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:54)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75)
at spark.example.KafkaSparkPopularHashTags$.main(KafkaSparkPopularHashTags.scala:48)
at spark.example.KafkaSparkPopularHashTags.main(KafkaSparkPopularHashTags.scala)
Caused by: java.lang.NoSuchMethodException: org.apache.spark.util.SystemClock.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)

错误表明第48行有问题,即:

val ssc = new StreamingContext(sc, Seconds(2))

系统似乎无法示例化对象。有什么解决这个问题的建议吗?其他信息:我正在使用Scala2.12,我已经尝试降级到Scala2.11甚至2.10。我试图重现这个实验:http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html

bkkx9g8r

bkkx9g8r1#

[更新]经过多次尝试,我发现我必须使用scala 2.11.1版,感谢您的宝贵帮助

相关问题