无法读取使用spark scala发布到kafka主题的twitter数据

46qrfjad  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(282)

我正在尝试一个基本的用例 twitter 数据并将其放入 kafka topic . 我参考了各种帖子,最终能够建立代码,但不幸的是,无法通过消费者阅读信息。下面是我的Spark代码

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object Testing {

  def main(args: Array[String]) {
    val appName = "TwitterData"
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", appName)
    val sc = new SparkContext(conf)
    //create context
    val ssc = new StreamingContext(sc, Seconds(10))

    // values of Twitter API.
    val consumerKey = "" // Your consumerKey
    val consumerSecret = "" // your API secret
    val accessToken = "" // your access token
    val accessTokenSecret = "" // your token secret

    //Connection to Twitter API
    val cb = new ConfigurationBuilder
    cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret).setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)

    val auth = new OAuthAuthorization(cb.build)
    val tweets = TwitterUtils.createStream(ssc, Some(auth))
    val englishTweets = tweets.filter(_.getLang() == "en")

    val statuses = englishTweets.map(status => (status.getText(), status.getUser.getName(), status.getUser.getScreenName(), status.getCreatedAt.toString))

    statuses.foreachRDD { (rdd, time) =>
    print("INSIDE ForEACH")
      rdd.foreachPartition { partitionIter =>
        val props = new Properties()
        val  bootstrap = "localhost:9092" //-- your external ip of GCP VM, example: 10.0.0.1:9092
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("bootstrap.servers", bootstrap)
        val producer = new KafkaProducer[String, String](props)
        partitionIter.foreach { elem =>
          val dat = elem.toString()
          println("before data....")
          print(dat)
          val data = new ProducerRecord[String, String]("twitterData", null,dat) // "twitterData" is the name of Kafka topic
          producer.send(data)
        }
        producer.flush()
        producer.close()
      }
    }
    ssc.start()
    ssc.awaitTermination()

我用下面的命令开始 consumer 用于阅读信息,但无法阅读任何信息。它不会抛出任何错误,但消息不会显示在使用者窗口中。

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic twitterData

我试着测试我的代码,我发现它永远不会去,直到语句 "before data..." 我想这就是为什么它没有被发表在这个主题上。
下面是我开始执行此代码时收到的eclipse控制台消息示例:

20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486242800 stored as values in memory (estimated size 88.0 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486242800 in memory on Siddhe:62039 (size: 88.0 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486242800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486242800
20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486243000 stored as values in memory (estimated size 4.6 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486243000 in memory on Siddhe:62039 (size: 4.6 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486243000 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486243000
20/01/08 17:54:04 INFO MemoryStore: Block input-0-1578486243800 stored as values in memory (estimated size 112.1 KB, free 1971.1 MB)
20/01/08 17:54:04 INFO BlockManagerInfo: Added input-0-1578486243800 in memory on Siddhe:62039 (size: 112.1 KB, free: 1971.1 MB)
20/01/08 17:54:04 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:04 WARN BlockManager: Block input-0-1578486243800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:04 INFO BlockGenerator: Pushed block input-0-1578486243800

让我知道我错过了什么。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题