scala—flink使用的大量内存

6jjcrrmo  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(459)

从上周开始,我在scala的flink中构建了一个数据流程序。但我有一个奇怪的行为,Flink使用了比我想象的更多的记忆。
在我的processfunction中有一个4 liststate的tuple(int,long)由int键控,我用它在不同的时间范围内得到不同的唯一计数器,我希望这个列表使用了大部分内存。
但事实并非如此。所以我打印了jvm的历史实况。我很惊讶有多少记忆被使用。

num     #instances         #bytes  class name
----------------------------------------------
   1:     138920685     6668192880  java.util.HashMap$Node
   2:     138893041     5555721640  org.apache.flink.streaming.api.operators.InternalTimer
   3:     149680624     3592334976  java.lang.Integer
   4:      48313229     3092046656  org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
   5:      14042723     2579684280  [Ljava.lang.Object;
   6:          4492     2047983264  [Ljava.util.HashMap$Node;
   7:      41686732     1333975424  com.myJob.flink.tupleState
   8:           201      784339688  [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
   9:      17230300      689212000  com.myJob.flink.uniqStruct
  10:      14025040      561001600  java.util.ArrayList
  11:       8615581      413547888  com.myJob.flink.Data$FingerprintCnt
  12:       6142006      393088384  com.myJob.flink.ProcessCountStruct
  13:       4307549      172301960  com.myJob.flink.uniqresult
  14:       4307841      137850912  com.myJob.flink.Data$FingerprintUniq
  15:       2153904      137849856  com.myJob.flink.Data$StreamData
  16:       1984742       79389680  scala.collection.mutable.ListBuffer
  17:       1909472       61103104  scala.collection.immutable.$colon$colon
  18:         22200       21844392  [B
  19:        282624        9043968  org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  20:         59045        6552856  [C
  21:         33194        2655520  java.nio.DirectByteBuffer
  22:         32804        2361888  sun.misc.Cleaner
  23:            35        2294600  [Lscala.concurrent.forkjoin.ForkJoinTask;
  24:           640        2276352  [Lorg.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
  25:         32768        2097152  org.apache.flink.core.memory.HybridMemorySegment
  26:         12291        2082448  java.lang.Class
  27:         58591        1874912  java.lang.String
  28:          8581        1372960  java.lang.reflect.Method
  29:         32790        1311600  java.nio.DirectByteBuffer$Deallocator
  30:         18537         889776  java.util.concurrent.ConcurrentHashMap$Node
  31:          4239         508680  java.lang.reflect.Field
  32:          8810         493360  java.nio.HeapByteBuffer
  33:          7389         472896  java.util.HashMap
  34:          5208         400336  [I

tuple(int,long)位于第7位的com.myjob.flink.tuplestate。我看到元组使用的内存不到2g。
我不明白为什么Flink要用这么多的内存来上课。
谁能告诉我这个行为,提前谢谢。
更新:
我在独立群集上运行作业(1个jobmanager,3个taskmanager)
flink版本是1.5快照提交:e4486ae
我在一个taskmanager节点上获取histo live。
更新2:
在我的processfunction中,我使用了:

ctx.timerService.registerProcessingTimeTimer(ctx.timestamp + 100)

之后呢 onTimer 功能,我处理我的 listState 检查所有旧数据。因此它为processfunction上的每个调用创建一个计时器。
但为什么计时器是钢的记忆后 onTimer 功能触发

gtlvzcf8

gtlvzcf81#

你最后有多少扇Windows?根据前两个条目,我们看到的是flink用来跟踪何时清理窗口的“计时器”。对于窗口中的每个键,您将以(key,endtimestamp)有效地处于计时器状态结束。如果有大量的窗口(可能是无序时间或延迟水印)或每个窗口中有大量的密钥,则每个窗口都会占用内存。
请注意,即使您使用的是rocksdb state,timerservice也会使用堆内存,因此您必须注意这一点。

相关问题