在我的spark流媒体应用程序中,我需要启动400个接收器,但启动接收器需要很长时间,这导致了一个问题,即许多处理数据排队,在应用程序时间到期后无法处理
代码:
Map<DataFile, List<String>> blockDataFileMap = new HashMap<>();
if (dataFileMap != null) {
// make dataFile divided into several(num of vu) blocks
for (Entry<DataFile, List<String>> entry : dataFileMap.entrySet()) {
DataFile dataFile = entry.getKey();
List<String> lines = entry.getValue();
if (lines.size() < vusers) {
LOG.warn(dataFile + " is too small to split. Whole file will be used for each vuser.");
blockDataFileMap.put(dataFile, lines);
} else {
// make every block only get the elements which index is in i st col of indexList
List<String> selectedLines = new ArrayList<>();
int block = lines.size() / vusers;
for (int j = 0, k = indexList.get(0).get(i); j < block; j++, k = j * vusers
+ indexList.get(j).get(i)) {
selectedLines.add(lines.get(k));
}
// add the element in the end of DataFile
if (block * vusers + i < lines.size()) {
selectedLines.add(lines.get(block * vusers + i));
}
blockDataFileMap.put(dataFile, selectedLines);
}
}
}
StreamingReceiver receiver = new StreamingReceiver(StorageLevel.MEMORY_AND_DISK(), batchTime,
blockDataFileMap, oneOffdataFiles, reportId, serverUrl, new SerializableConfiguration(conf),
serviceWrapper);
list.add(jssc.receiverStream(receiver));
}
@SuppressWarnings("unchecked")
JavaDStream<Result>[] javaDStreams = (JavaDStream<Result>[]) new JavaDStream<?>[list.size()];
int len = list.size();
for (int i = 0; i < len; i++) {
javaDStreams[i] = list.get(i);
}
JavaDStream<Result> lines = jssc.union(javaDStreams);
// List<(chainId, Result)>
JavaPairDStream<String, Result> pointRdd = lines.mapToPair(new PairFunction<Result, String, Result>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Result> call(Result result) throws Exception {
return new Tuple2<>(result.getChain(), result);
}
});
// shuffle when groupByKey
pointRdd.groupByKey().map(
new Function<Tuple2<String, Iterable<Result>>, Tuple2<String, Iterator<Tuple2<Long,
Iterator<Result>>>>>() {
private static final long serialVersionUID = 5928985493323358322L;
// Arg: (chainId, List<Result>)
// Return: (chainId, List<(timeInSecond, List<Result>)>)
@Override
public Tuple2<String, Iterator<Tuple2<Long, Iterator<Result>>>> call(
Tuple2<String, Iterable<Result>> v1) throws Exception {
String chain = v1._1;
Iterator<Result> results = v1._2.iterator();
Map<Long, List<Result>> map = new HashMap<>();
List<Tuple2<Long, Iterator<Result>>> list = new ArrayList<>();
while (results.hasNext()) {
Result tmp = results.next();
Long time = tmp.getTime();
if (!map.containsKey(time)) {
map.put(time, new ArrayList<Result>());
}
map.get(time).add(tmp);
}
for (Entry<Long, List<Result>> tmp : map.entrySet()) {
list.add(new Tuple2<>(tmp.getKey(), tmp.getValue().iterator()));
}
return new Tuple2<>(chain, list.iterator());
}
}).mapPartitions(new CalculateFunction(broadcast))
.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, Map<String, Map<Long, Integer>>>>>() {
@Override
public void call(JavaRDD<Tuple2<String, Map<String, Map<Long, Integer>>>> tuple2JavaRDD)
throws Exception {
List<Tuple2<String, Map<String, Map<Long, Integer>>>> resList = tuple2JavaRDD.collect();
if (resList.size() > 0) {
for (Tuple2<String, Map<String, Map<Long, Integer>>> resTuple : resList) {
String chain = resTuple._1();
Map<String, Map<Long, Integer>> histogramResultWithTag = resTuple._2();
//merge histogram data from MapFunction into the HashMap of the final result
if (histogramFinalResultMap.containsKey(chain)) {
Map<String, Map<Long, Integer>> histogramFinalResultWithTag =
histogramFinalResultMap.get(chain);
for (Entry<String, Map<Long, Integer>> tmpEntryWithTag
: histogramResultWithTag.entrySet()) {
String tag = tmpEntryWithTag.getKey();
Map<Long, Integer> histogramResult = tmpEntryWithTag.getValue();
if (histogramFinalResultWithTag.containsKey(tag)) {
Map<Long, Integer> histogramFinalResult =
histogramFinalResultWithTag.get(tag);
for (Entry<Long, Integer> tmpEntry : histogramResult.entrySet()) {
Long rt = tmpEntry.getKey();
Integer count = tmpEntry.getValue();
if (histogramFinalResult.containsKey(rt)) {
histogramFinalResult.put(rt, count + histogramFinalResult.get(rt));
} else {
histogramFinalResult.put(rt, count);
}
}
} else {
histogramFinalResultWithTag.put(tag, histogramResult);
}
}
} else {
histogramFinalResultMap.put(chain, histogramResultWithTag);
}
}
}
}
});
jssc.start();
jssc.awaitTerminationOrTimeout(timeLimit * 1000);
jssc.close();
在我的应用程序中,我将初始化400个接收器。
我不知道为什么发射的接收器这么慢(给我一些可能的原因)
暂无答案!
目前还没有任何答案,快来回答吧!