随着越来越多的记录被处理,我的程序变得非常慢。我最初认为这是由于过度的内存消耗,因为我的程序是字符串密集型的(我使用的是java 11,所以应该尽可能使用紧凑的字符串),所以我增加了jvm堆:
-Xms2048m
-Xmx6144m
我还增加了任务管理器的内存和超时, flink-conf.yaml
:
jobmanager.heap.size: 6144m
heartbeat.timeout: 5000000
然而,这些都无助于解决这个问题。这个程序在处理了大约350万条记录之后,仍然非常慢,只剩下大约50万条了。当程序接近350万大关时,它会变得非常慢,直到最终超时,总执行时间约为11分钟。
我在visualvm中检查了内存消耗,但内存消耗从未超过约700mb。我的flink管道如下所示:
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1);
environment.setParallelism(1);
DataStream<Tuple> stream = environment.addSource(new TPCHQuery3Source(filePaths, relations));
stream.process(new TPCHQuery3Process(relations)).addSink(new FDSSink());
environment.execute("FlinkDataService");
如果大部分工作是在process函数中完成的,那么我将实现数据库连接算法,并将列存储为字符串,特别是我将实现tpch基准的query 3,如果您愿意,请检查这里https://examples.citusdata.com/tpch_queries.html.
超时错误如下:
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id <id> timed out.
一旦我也犯了这个错误:
Exception in thread "pool-1-thread-1" java.lang.OutOfMemoryError: Java heap space
另外,我的visualvm监控,截图是在事情变得非常缓慢的时候拍摄的:
下面是我的源函数的运行循环:
while (run) {
readers.forEach(reader -> {
try {
String line = reader.readLine();
if (line != null) {
Tuple tuple = lineToTuple(line, counter.get() % filePaths.size());
if (tuple != null && isValidTuple(tuple)) {
sourceContext.collect(tuple);
}
} else {
closedReaders.add(reader);
if (closedReaders.size() == filePaths.size()) {
System.out.println("ALL FILES HAVE BEEN STREAMED");
cancel();
}
}
counter.getAndIncrement();
} catch (IOException e) {
e.printStackTrace();
}
});
}
我基本上读取了我需要的3个文件的每一行,根据文件的顺序,我构造了一个tuple对象,它是我的自定义类tuple,表示表中的一行,如果它是有效的,则发出这个tuple,即fullfils在日期上的特定条件。
我还建议jvm在第一百万、一百五十万、两百万和二百五十万条记录中进行垃圾收集,如下所示:
System.gc()
关于如何优化这个有什么想法吗?
2条答案
按热度按时间4jb9z9bj1#
这些是我在link独立集群上为计算tpc-h查询03而更改的属性。
我实现了这个查询,只对order表进行流式处理,并将其他表作为一个状态。另外,我将计算作为一个无窗口查询,我认为这更有意义,而且速度更快。
这里的UDF是:ordersource、orderkeyedbycustomerprocessfunction、shippingprioritykeyedprocessfunction和sumshippingpriorityitem。我用的是
com.google.common.collect.ImmutableList
因为状态不会被更新。另外,我只保留必要的专栏,如ImmutableList<Tuple2<Long, Double>> lineItemList
.new9mtju2#
字符串
intern()
救了我。我在把每根线都存储在Map上之前都做过实习,效果很不错。