在单个文件n spark scala中写入数据

q8l4jmvw  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(447)

我正在尝试使用spark scala在单个文件中写入数据:

while (loop > 0) {
      val getReq = new HttpGet(ww.url.com)

      val httpResponse = client.execute(getReq)
      val data = Source.fromInputStream(httpResponse.getEntity.getContent()).getLines.mkString

      val parser = JSON.parseFull(data)
      val globalMap = parser.get.asInstanceOf[Map[String, Any]]
      val reviewMap = globalMap.get("payload").get.asInstanceOf[Map[String, Any]]

      val df = context.sparkContext.parallelize(Seq(reviewMap.get("records").get.toString())).toDF()
      if (startIndex == 0) {
        df.coalesce(1).write.mode(SaveMode.Overwrite).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
      } else {
        df.coalesce(1).write.mode(SaveMode.Append).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")

      }

      startIndex = startIndex + limit
      loop = loop - 1
      httpResponse.close()
    }

创建的文件数是循环数,我只想创建一个文件。它也在创建crc文件,我想删除这些:我尝试了下面的配置,但它只停止创建成功的文件:

.config("dfs.client.read.shortcircuit.skip.checksum", "true")
      .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
      .config("fs.file.impl.disable.cache", true)

有没有办法只创建一个没有crc和success文件的文件?

arknldoa

arknldoa1#

回复:“创建的文件数是循环数”
即使在代码中使用df.coalesce(1),它的执行次数仍然与运行while循环的次数相同。
我只想从您的代码创建一个文件,似乎您正试图调用httpget请求到某个url,并在解析后保存内容。如果这个理解是正确的,那么我相信您不应该使用while循环来完成这个任务。有 map 可以按以下方式使用的转换。
请在下面找到psuedo代码以供参考。

val urls = List("a.com","b.com","c.com")
    val sourcedf = sparkContext.parallelize(urls).toDF
    //this could be map or flatMap based on your requirement.
    val yourprocessedDF = sourcedf.map(<< do your parsing here and emit data>>)
    yourprocessedDF.repartition(1).write(<<whichever format you need>>)

相关问题