我在hdfs中有一个parquet文件作为数据的初始加载。接下来的所有Parquet只是这些数据集,这些数据集每天都会更改为初始负载(按时间顺序)。这是我的三角洲。我想阅读所有或几个Parquet文件有一个特定日期的最新数据。delta也可以包含新记录。
例子:
初始数据(文件夹:/path/spezific\u data/20180101):
ID| Name | Street |
1 | "Tom" |"Street 1"|
2 | "Peter"|"Street 2"|
增量1(文件夹:/path/spezific\u data/20180102):
ID| Name | Street |
1 | "Tom" |"Street 21"|
delta 2(文件夹::/path/spezific_data/20180103):
ID| Name | Street |
2 | "Peter" |"Street 44"|
3 | "Hans" | "Street 12"|
delta 3(文件夹::/path/spezific_data/20180105):
ID| Name | Street |
2 | "Hans" |"Street 55"|
有可能某一天有增量,但在第二天加载(看看delta 2和delta 3),文件夹/path/spezific_data/20180104确实存在,我们永远不想加载这个日期。现在我想装不同的箱子。
只有初始数据:这是一个简单的目录加载。
initial = spark.read.parquet("hdfs:/path/spezific_data/20180101/")
直到特定日期(20180103)
initial_df = spark.read.parquet("hdfs:/path/spezific_data/20180101/") <br>
delta_df = spark.read.parquet("hdfs:/path/spezific_data/20180102/")
现在我必须合并(“更新”我知道spark RDD或dataframes不能做更新)这些数据集,另一个也要合并。目前我用这行代码来解决这个问题(但是在for循环中):
new_df = delta_df.union(initila_df).dropDuplicates("ID") <br>
delta_df = spark.read.parqeut("hdfs:/mypath/20180103/") <br>
new_df = delta_df.union(new_df).dropDuplicates("ID") <br>
但我认为这不是一个好办法。
加载文件夹“/path/spezific\u data”中的所有数据,就像第一步一样,我使用for循环直到最晚的日期
问题:我能这样做吗?有更好的方法吗?我能把这个放在一个df里合并吗?
目前装载时间很长(一小时)
更新1:
我试过这样做。如果我运行这个代码,它会遍历所有日期直到我的enddate(我在println(date)上看到这个)。之后,我得到一个java.lang.StackOverflower错误。错误在哪里?
import org.apache.spark.sql.functions.col
import util.control.Breaks._
var sourcePath = "hdfs:sourceparth/"
var destinationPath = "hdfs:destiantionpath/result"
var initial_date = "20170427"
var start_year = 2017
var end_year = 2019
var end_month = 10
var end_day = 31
var m : String = _
var d : String = _
var date : String = _
var delta_df : org.apache.spark.sql.DataFrame = _
var doubleRows_df : org.apache.spark.sql.DataFrame = _
//final DF, initial load
var final_df = spark.read.parquet(sourcePath + initial_date + "*")
breakable{
for(year <- 2017 to end_year; month <- 1 to 12; day <- 1 to 31){
//Create date String
m = month.toString()
d = day.toString()
if(month < 10)
m = "0" + m
if(day < 10)
d = "0" + d
date = year.toString() + m + d
try{
//one delta
delta_df = spark.read.parquet(sourcePath + date + "*")
//delete double Rows (i want to ignore them
doubleRows_df = delta_df.groupBy("key").count().where("count > 1").select("key")
delta_df = delta_df.join(doubleRows_df, Seq("key"), "leftanti")
//deletes all (old) rows in final_df, that are in delta_df
final_df = final_df.join(delta_df, Seq("key"), "leftanti")
//add all new rows in delta
final_df = final_df.union(delta_df)
println(date)
}catch{
case e:org.apache.spark.sql.AnalysisException=>{}
}
if(day == end_day && month == end_month && year == end_year)
break
}
}
final_df.write.mode("overwrite").parquet(destinationPath)
完整堆栈跟踪:
19/11/26 11:19:04 WARN util.Utils: Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:223)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
1条答案
按热度按时间v1uwarro1#
distinct
或者dropDuplicates
不是选项,因为您无法控制将采用哪些值。很有可能,新的价值不会被增加,而旧的价值会被保留。你需要做什么
join
结束ID
-请参见此处的连接类型。连接的行应该只包含旧的,或者只包含新的,或者两者都包含。当只有旧的或只有新的-你拿一个现在,当两者-你只拿新的。下面的示例说明如何一次添加多个增量。
问:在每一个类别中,什么是最畅销和第二畅销的产品?
更新1:
首先,请考虑使用日期实用程序,而不是手动迭代数字以获取日期:
请参阅此以了解更多详细信息。
第二-请张贴完整的stacktrace,而不仅仅是
StackOverflowException
.