我使用CreateDirectStream来集成sparkstreaming和kafka。以下是我使用的代码:
val ssc = new StreamingContext(new SparkConf, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
val topics = Set("topic1")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
现在我想把消息存储到hdfs中。这样做对吗?
messages.saveAsTextFiles("/tmp/spark/messages")
1条答案
按热度按时间relj7zay1#
saveAsTextFiles("/tmp/spark/messages")
-这将把数据持久保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是本地hdfs的一部分,那么它也将显示在hdfs目录中,因为saveAsTextFiles
利用相同的mapereduceapi来编写输出。上述方法适用于spark执行器和hdfs在同一台物理机器上的情况,但是如果hdfs目录或url不同并且不在同一台机器上,执行器正在运行,那么这将不起作用。
如果您需要确保您的数据在hdfs中持久化,那么作为一个好的实践,您应该始终提供完整的hdfs url。像这样的-
saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")
或者您也可以利用以下任一方法:-DStream.saveAsNewAPIHadoopFiles()
DStream.saveAsHadoopFiles(<HDFS URL with Location>)