警告randomblockreplicationpolicy:需要1个副本,只有0个对等方/s

vjrehmav  于 2021-05-22  发布在  Spark
关注(0)|答案(0)|浏览(567)

我试着把Kafka、spark流媒体和twitter结合起来。
我收到以下警告消息,我想找到解决方案。。如果消息是错误消息,我想我可以找到更容易的解决方案,但我不能得到的警告消息的解决办法。。。

20/10/14 09:09:27 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/10/14 09:09:27 WARN BlockManager: Block input-0-1602634166800 replicated to only 0 peer(s) instead of 1 peers

我在apachespark中使用了如下所示的线缆。

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

val appName = "TwitterData"
val ssc = new StreamingContext(sc, Seconds(10))
val consumerKey = "consumerKey"
val consumerSecret = "consumerSecret"
val accessToken = "accessToken"
val accessTokenSecret = "accessTokenSecret"

val cb = new ConfigurationBuilder  
cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret).setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc, Some(auth))
val XRPTweets = tweets.filter(_.getLang() == "XRP")
val statuses = XRPTweets.map(status => (status.getText(),status.getUser.getName(),status.getUser.getScreenName(),status.getCreatedAt.toString))

 statuses.foreachRDD { (rdd, time) =>
 | rdd.foreachPartition { partitionIter =>
 | val props = new Properties()
 | val bootstrap = "localhost:9092"
 | props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
 | props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
 | props.put("bootstrap.servers", bootstrap)
 | val producer = new KafkaProducer[String, String](props)
 | partitionIter.foreach { elem =>
 | val dat = elem.toString()
 | val data = new ProducerRecord[String, String]("XRP", null, dat)
 | producer.send(data)
 | }
 | producer.flush()
 | producer.close()
 | }

ssc.start()

如何修复此警告消息并获得结果?我已经尝试通过在谷歌上搜索来找到解决方案,但是没有太多关于Kafka和spark与twitter集成的信息来显示这个警告消息。。
谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题