从上周开始,我在scala的flink中构建了一个数据流程序。但我有一个奇怪的行为,Flink使用了比我想象的更多的记忆。
在我的processfunction中有一个4 liststate的tuple(int,long)由int键控,我用它在不同的时间范围内得到不同的唯一计数器,我希望这个列表使用了大部分内存。
但事实并非如此。所以我打印了jvm的历史实况。我很惊讶有多少记忆被使用。
num #instances #bytes class name
----------------------------------------------
1: 138920685 6668192880 java.util.HashMap$Node
2: 138893041 5555721640 org.apache.flink.streaming.api.operators.InternalTimer
3: 149680624 3592334976 java.lang.Integer
4: 48313229 3092046656 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
5: 14042723 2579684280 [Ljava.lang.Object;
6: 4492 2047983264 [Ljava.util.HashMap$Node;
7: 41686732 1333975424 com.myJob.flink.tupleState
8: 201 784339688 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
9: 17230300 689212000 com.myJob.flink.uniqStruct
10: 14025040 561001600 java.util.ArrayList
11: 8615581 413547888 com.myJob.flink.Data$FingerprintCnt
12: 6142006 393088384 com.myJob.flink.ProcessCountStruct
13: 4307549 172301960 com.myJob.flink.uniqresult
14: 4307841 137850912 com.myJob.flink.Data$FingerprintUniq
15: 2153904 137849856 com.myJob.flink.Data$StreamData
16: 1984742 79389680 scala.collection.mutable.ListBuffer
17: 1909472 61103104 scala.collection.immutable.$colon$colon
18: 22200 21844392 [B
19: 282624 9043968 org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
20: 59045 6552856 [C
21: 33194 2655520 java.nio.DirectByteBuffer
22: 32804 2361888 sun.misc.Cleaner
23: 35 2294600 [Lscala.concurrent.forkjoin.ForkJoinTask;
24: 640 2276352 [Lorg.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
25: 32768 2097152 org.apache.flink.core.memory.HybridMemorySegment
26: 12291 2082448 java.lang.Class
27: 58591 1874912 java.lang.String
28: 8581 1372960 java.lang.reflect.Method
29: 32790 1311600 java.nio.DirectByteBuffer$Deallocator
30: 18537 889776 java.util.concurrent.ConcurrentHashMap$Node
31: 4239 508680 java.lang.reflect.Field
32: 8810 493360 java.nio.HeapByteBuffer
33: 7389 472896 java.util.HashMap
34: 5208 400336 [I
tuple(int,long)位于第7位的com.myjob.flink.tuplestate。我看到元组使用的内存不到2g。
我不明白为什么Flink要用这么多的内存来上课。
谁能告诉我这个行为,提前谢谢。
更新:
我在独立群集上运行作业(1个jobmanager,3个taskmanager)
flink版本是1.5快照提交:e4486ae
我在一个taskmanager节点上获取histo live。
更新2:
在我的processfunction中,我使用了:
ctx.timerService.registerProcessingTimeTimer(ctx.timestamp + 100)
之后呢 onTimer
功能,我处理我的 listState
检查所有旧数据。因此它为processfunction上的每个调用创建一个计时器。
但为什么计时器是钢的记忆后 onTimer
功能触发
1条答案
按热度按时间gtlvzcf81#
你最后有多少扇Windows?根据前两个条目,我们看到的是flink用来跟踪何时清理窗口的“计时器”。对于窗口中的每个键,您将以(key,endtimestamp)有效地处于计时器状态结束。如果有大量的窗口(可能是无序时间或延迟水印)或每个窗口中有大量的密钥,则每个窗口都会占用内存。
请注意,即使您使用的是rocksdb state,timerservice也会使用堆内存,因此您必须注意这一点。