我正在尝试使用spark scala在单个文件中写入数据:
while (loop > 0) {
val getReq = new HttpGet(ww.url.com)
val httpResponse = client.execute(getReq)
val data = Source.fromInputStream(httpResponse.getEntity.getContent()).getLines.mkString
val parser = JSON.parseFull(data)
val globalMap = parser.get.asInstanceOf[Map[String, Any]]
val reviewMap = globalMap.get("payload").get.asInstanceOf[Map[String, Any]]
val df = context.sparkContext.parallelize(Seq(reviewMap.get("records").get.toString())).toDF()
if (startIndex == 0) {
df.coalesce(1).write.mode(SaveMode.Overwrite).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
} else {
df.coalesce(1).write.mode(SaveMode.Append).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
}
startIndex = startIndex + limit
loop = loop - 1
httpResponse.close()
}
创建的文件数是循环数,我只想创建一个文件。它也在创建crc文件,我想删除这些:我尝试了下面的配置,但它只停止创建成功的文件:
.config("dfs.client.read.shortcircuit.skip.checksum", "true")
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.config("fs.file.impl.disable.cache", true)
有没有办法只创建一个没有crc和success文件的文件?
1条答案
按热度按时间arknldoa1#
回复:“创建的文件数是循环数”
即使在代码中使用df.coalesce(1),它的执行次数仍然与运行while循环的次数相同。
我只想从您的代码创建一个文件,似乎您正试图调用httpget请求到某个url,并在解析后保存内容。如果这个理解是正确的,那么我相信您不应该使用while循环来完成这个任务。有
map
可以按以下方式使用的转换。请在下面找到psuedo代码以供参考。