使用Dataframe在spark中处理数据差异(delta)

dfuffjeb  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(570)

我在hdfs中有一个parquet文件作为数据的初始加载。接下来的所有Parquet只是这些数据集,这些数据集每天都会更改为初始负载(按时间顺序)。这是我的三角洲。我想阅读所有或几个Parquet文件有一个特定日期的最新数据。delta也可以包含新记录。
例子:
初始数据(文件夹:/path/spezific\u data/20180101):

ID| Name  | Street    | 
1 | "Tom" |"Street 1"| 
2 | "Peter"|"Street 2"|

增量1(文件夹:/path/spezific\u data/20180102):

ID| Name  | Street    | 
1 | "Tom" |"Street 21"|

delta 2(文件夹::/path/spezific_data/20180103):

ID| Name  | Street    | 
2 | "Peter" |"Street 44"|
3 | "Hans" | "Street 12"|

delta 3(文件夹::/path/spezific_data/20180105):

ID| Name  | Street    | 
2 | "Hans" |"Street 55"|

有可能某一天有增量,但在第二天加载(看看delta 2和delta 3),文件夹/path/spezific_data/20180104确实存在,我们永远不想加载这个日期。现在我想装不同的箱子。
只有初始数据:这是一个简单的目录加载。

initial = spark.read.parquet("hdfs:/path/spezific_data/20180101/")

直到特定日期(20180103)

initial_df = spark.read.parquet("hdfs:/path/spezific_data/20180101/") <br>
delta_df = spark.read.parquet("hdfs:/path/spezific_data/20180102/")

现在我必须合并(“更新”我知道spark RDD或dataframes不能做更新)这些数据集,另一个也要合并。目前我用这行代码来解决这个问题(但是在for循环中):

new_df = delta_df.union(initila_df).dropDuplicates("ID") <br>
 delta_df = spark.read.parqeut("hdfs:/mypath/20180103/") <br>
 new_df = delta_df.union(new_df).dropDuplicates("ID") <br>

但我认为这不是一个好办法。
加载文件夹“/path/spezific\u data”中的所有数据,就像第一步一样,我使用for循环直到最晚的日期
问题:我能这样做吗?有更好的方法吗?我能把这个放在一个df里合并吗?
目前装载时间很长(一小时)
更新1:
我试过这样做。如果我运行这个代码,它会遍历所有日期直到我的enddate(我在println(date)上看到这个)。之后,我得到一个java.lang.StackOverflower错误。错误在哪里?

import org.apache.spark.sql.functions.col
import util.control.Breaks._

var sourcePath = "hdfs:sourceparth/"
var destinationPath = "hdfs:destiantionpath/result"
var initial_date = "20170427"
var start_year = 2017
var end_year = 2019
var end_month = 10
var end_day = 31

var m : String = _
var d : String = _
var date : String = _
var delta_df : org.apache.spark.sql.DataFrame = _
var doubleRows_df : org.apache.spark.sql.DataFrame = _

//final DF, initial load
var final_df = spark.read.parquet(sourcePath + initial_date +  "*")

breakable{
   for(year <- 2017 to end_year; month <- 1 to 12; day <- 1 to 31){
     //Create date String
     m = month.toString()
     d = day.toString()
     if(month < 10)
       m = "0" + m
     if(day < 10)
       d = "0" + d
     date = year.toString() + m + d

     try{
       //one delta
       delta_df = spark.read.parquet(sourcePath + date + "*")

       //delete double Rows (i want to ignore them
       doubleRows_df  = delta_df.groupBy("key").count().where("count > 1").select("key")
       delta_df = delta_df.join(doubleRows_df, Seq("key"), "leftanti")

       //deletes all (old) rows in final_df, that are in delta_df
       final_df = final_df.join(delta_df, Seq("key"), "leftanti")

       //add all new rows in delta
       final_df = final_df.union(delta_df)

       println(date)
     }catch{
       case e:org.apache.spark.sql.AnalysisException=>{}
     }
    if(day == end_day && month == end_month &&  year == end_year)
       break
   }
 }
 final_df.write.mode("overwrite").parquet(destinationPath)

完整堆栈跟踪:

19/11/26 11:19:04 WARN util.Utils: Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
        at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
        at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:223)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
        at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
        at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
        at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
        at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
v1uwarro

v1uwarro1#

distinct 或者 dropDuplicates 不是选项,因为您无法控制将采用哪些值。很有可能,新的价值不会被增加,而旧的价值会被保留。
你需要做什么 join 结束 ID -请参见此处的连接类型。连接的行应该只包含旧的,或者只包含新的,或者两者都包含。当只有旧的或只有新的-你拿一个现在,当两者-你只拿新的。
下面的示例说明如何一次添加多个增量。
问:在每一个类别中,什么是最畅销和第二畅销的产品?

val dataset = Seq(
  ("Thin",       "cell phone", 6000),
  ("Normal",     "tablet",     1500),
  ("Mini",       "tablet",     5500),
  ("Ultra thin", "cell phone", 5000),
  ("Very thin",  "cell phone", 6000),
  ("Big",        "tablet",     2500),
  ("Bendable",   "cell phone", 3000),
  ("Foldable",   "cell phone", 3000),
  ("Pro",        "tablet",     4500),
  ("Pro2",       "tablet",     6500))
  .toDF("product", "category", "revenue")

val overCategory = Window.partitionBy('category).orderBy('revenue.desc)

val ranked = data.withColumn("rank", dense_rank.over(overCategory))

scala> ranked.show
+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Pro2|    tablet|   6500|   1|
|      Mini|    tablet|   5500|   2|
|       Pro|    tablet|   4500|   3|
|       Big|    tablet|   2500|   4|
|    Normal|    tablet|   1500|   5|
|      Thin|cell phone|   6000|   1|
| Very thin|cell phone|   6000|   1|
|Ultra thin|cell phone|   5000|   2|
|  Bendable|cell phone|   3000|   3|
|  Foldable|cell phone|   3000|   3|
+----------+----------+-------+----+

scala> ranked.where('rank <= 2).show
+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Pro2|    tablet|   6500|   1|
|      Mini|    tablet|   5500|   2|
|      Thin|cell phone|   6000|   1|
| Very thin|cell phone|   6000|   1|
|Ultra thin|cell phone|   5000|   2|
+----------+----------+-------+----+

更新1:
首先,请考虑使用日期实用程序,而不是手动迭代数字以获取日期:

Date dt = new Date();
LocalDateTime.from(dt.toInstant()).plusDays(1);

请参阅此以了解更多详细信息。
第二-请张贴完整的stacktrace,而不仅仅是 StackOverflowException .

相关问题