kafka到sparkstreaming到hdfs

xt0899hw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我使用CreateDirectStream来集成sparkstreaming和kafka。以下是我使用的代码:

val ssc = new StreamingContext(new SparkConf, Seconds(10))
    val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
    val topics = Set("topic1")

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)

现在我想把消息存储到hdfs中。这样做对吗?

messages.saveAsTextFiles("/tmp/spark/messages")
relj7zay

relj7zay1#

saveAsTextFiles("/tmp/spark/messages") -这将把数据持久保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是本地hdfs的一部分,那么它也将显示在hdfs目录中,因为 saveAsTextFiles 利用相同的mapereduceapi来编写输出。
上述方法适用于spark执行器和hdfs在同一台物理机器上的情况,但是如果hdfs目录或url不同并且不在同一台机器上,执行器正在运行,那么这将不起作用。
如果您需要确保您的数据在hdfs中持久化,那么作为一个好的实践,您应该始终提供完整的hdfs url。像这样的- saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages") 或者您也可以利用以下任一方法:-
DStream.saveAsNewAPIHadoopFiles() DStream.saveAsHadoopFiles(<HDFS URL with Location>)

相关问题