我正在尝试一个基本的用例 twitter
数据并将其放入 kafka topic
. 我参考了各种帖子,最终能够建立代码,但不幸的是,无法通过消费者阅读信息。下面是我的Spark代码
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Testing {
def main(args: Array[String]) {
val appName = "TwitterData"
val conf = new SparkConf()
conf.set("spark.master", "local")
conf.set("spark.app.name", appName)
val sc = new SparkContext(conf)
//create context
val ssc = new StreamingContext(sc, Seconds(10))
// values of Twitter API.
val consumerKey = "" // Your consumerKey
val consumerSecret = "" // your API secret
val accessToken = "" // your access token
val accessTokenSecret = "" // your token secret
//Connection to Twitter API
val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret).setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc, Some(auth))
val englishTweets = tweets.filter(_.getLang() == "en")
val statuses = englishTweets.map(status => (status.getText(), status.getUser.getName(), status.getUser.getScreenName(), status.getCreatedAt.toString))
statuses.foreachRDD { (rdd, time) =>
print("INSIDE ForEACH")
rdd.foreachPartition { partitionIter =>
val props = new Properties()
val bootstrap = "localhost:9092" //-- your external ip of GCP VM, example: 10.0.0.1:9092
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("bootstrap.servers", bootstrap)
val producer = new KafkaProducer[String, String](props)
partitionIter.foreach { elem =>
val dat = elem.toString()
println("before data....")
print(dat)
val data = new ProducerRecord[String, String]("twitterData", null,dat) // "twitterData" is the name of Kafka topic
producer.send(data)
}
producer.flush()
producer.close()
}
}
ssc.start()
ssc.awaitTermination()
我用下面的命令开始 consumer
用于阅读信息,但无法阅读任何信息。它不会抛出任何错误,但消息不会显示在使用者窗口中。
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic twitterData
我试着测试我的代码,我发现它永远不会去,直到语句 "before data..."
我想这就是为什么它没有被发表在这个主题上。
下面是我开始执行此代码时收到的eclipse控制台消息示例:
20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486242800 stored as values in memory (estimated size 88.0 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486242800 in memory on Siddhe:62039 (size: 88.0 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486242800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486242800
20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486243000 stored as values in memory (estimated size 4.6 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486243000 in memory on Siddhe:62039 (size: 4.6 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486243000 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486243000
20/01/08 17:54:04 INFO MemoryStore: Block input-0-1578486243800 stored as values in memory (estimated size 112.1 KB, free 1971.1 MB)
20/01/08 17:54:04 INFO BlockManagerInfo: Added input-0-1578486243800 in memory on Siddhe:62039 (size: 112.1 KB, free: 1971.1 MB)
20/01/08 17:54:04 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:04 WARN BlockManager: Block input-0-1578486243800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:04 INFO BlockGenerator: Pushed block input-0-1578486243800
让我知道我错过了什么。
暂无答案!
目前还没有任何答案,快来回答吧!