启动spark流接收器慢?

wj8zmpe1  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(293)

在我的spark流媒体应用程序中,我需要启动400个接收器,但启动接收器需要很长时间,这导致了一个问题,即许多处理数据排队,在应用程序时间到期后无法处理

代码:

Map<DataFile, List<String>> blockDataFileMap = new HashMap<>();
            if (dataFileMap != null) {
                // make dataFile divided into several(num of vu) blocks
                for (Entry<DataFile, List<String>> entry : dataFileMap.entrySet()) {
                    DataFile dataFile = entry.getKey();
                    List<String> lines = entry.getValue();
                    if (lines.size() < vusers) {
                        LOG.warn(dataFile + " is too small to split. Whole file will be used for each vuser.");
                        blockDataFileMap.put(dataFile, lines);
                    } else {
                        // make every block only get the elements which index is in i st col of indexList
                        List<String> selectedLines = new ArrayList<>();
                        int block = lines.size() / vusers;
                        for (int j = 0, k = indexList.get(0).get(i); j < block; j++, k = j * vusers
                                + indexList.get(j).get(i)) {
                            selectedLines.add(lines.get(k));
                        }
                        // add the element in the end of DataFile
                        if (block * vusers + i < lines.size()) {
                            selectedLines.add(lines.get(block * vusers + i));
                        }
                        blockDataFileMap.put(dataFile, selectedLines);
                    }
                }
            }
            StreamingReceiver receiver = new StreamingReceiver(StorageLevel.MEMORY_AND_DISK(), batchTime,
                    blockDataFileMap, oneOffdataFiles, reportId, serverUrl, new SerializableConfiguration(conf),
                    serviceWrapper);
            list.add(jssc.receiverStream(receiver));
        }

        @SuppressWarnings("unchecked")
        JavaDStream<Result>[] javaDStreams = (JavaDStream<Result>[]) new JavaDStream<?>[list.size()];
        int len = list.size();
        for (int i = 0; i < len; i++) {
            javaDStreams[i] = list.get(i);
        }
        JavaDStream<Result> lines = jssc.union(javaDStreams);

        // List<(chainId, Result)>
        JavaPairDStream<String, Result> pointRdd = lines.mapToPair(new PairFunction<Result, String, Result>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Result> call(Result result) throws Exception {
                return new Tuple2<>(result.getChain(), result);
            }
        });

        // shuffle when groupByKey
        pointRdd.groupByKey().map(
                new Function<Tuple2<String, Iterable<Result>>, Tuple2<String, Iterator<Tuple2<Long,
                        Iterator<Result>>>>>() {

                    private static final long serialVersionUID = 5928985493323358322L;

                    // Arg: (chainId, List<Result>)
                    // Return: (chainId, List<(timeInSecond, List<Result>)>)
                    @Override
                    public Tuple2<String, Iterator<Tuple2<Long, Iterator<Result>>>> call(
                            Tuple2<String, Iterable<Result>> v1) throws Exception {
                        String chain = v1._1;
                        Iterator<Result> results = v1._2.iterator();
                        Map<Long, List<Result>> map = new HashMap<>();
                        List<Tuple2<Long, Iterator<Result>>> list = new ArrayList<>();
                        while (results.hasNext()) {
                            Result tmp = results.next();
                            Long time = tmp.getTime();
                            if (!map.containsKey(time)) {
                                map.put(time, new ArrayList<Result>());
                            }
                            map.get(time).add(tmp);
                        }
                        for (Entry<Long, List<Result>> tmp : map.entrySet()) {
                            list.add(new Tuple2<>(tmp.getKey(), tmp.getValue().iterator()));
                        }
                        return new Tuple2<>(chain, list.iterator());
                    }
                }).mapPartitions(new CalculateFunction(broadcast))
                .foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, Map<String, Map<Long, Integer>>>>>() {
                    @Override
                    public void call(JavaRDD<Tuple2<String, Map<String, Map<Long, Integer>>>> tuple2JavaRDD)
                            throws Exception  {
                        List<Tuple2<String, Map<String, Map<Long, Integer>>>> resList = tuple2JavaRDD.collect();
                        if (resList.size() > 0) {

                            for (Tuple2<String, Map<String, Map<Long, Integer>>> resTuple : resList) {
                                String chain = resTuple._1();
                                Map<String, Map<Long, Integer>> histogramResultWithTag = resTuple._2();
                                //merge histogram data from MapFunction into the HashMap of the final result
                                if (histogramFinalResultMap.containsKey(chain)) {
                                    Map<String, Map<Long, Integer>> histogramFinalResultWithTag =
                                            histogramFinalResultMap.get(chain);
                                    for (Entry<String, Map<Long, Integer>> tmpEntryWithTag
                                            : histogramResultWithTag.entrySet()) {
                                        String tag = tmpEntryWithTag.getKey();
                                        Map<Long, Integer> histogramResult = tmpEntryWithTag.getValue();
                                        if (histogramFinalResultWithTag.containsKey(tag)) {
                                            Map<Long, Integer> histogramFinalResult =
                                                    histogramFinalResultWithTag.get(tag);
                                            for (Entry<Long, Integer> tmpEntry : histogramResult.entrySet()) {
                                                Long rt = tmpEntry.getKey();
                                                Integer count = tmpEntry.getValue();
                                                if (histogramFinalResult.containsKey(rt)) {
                                                    histogramFinalResult.put(rt, count + histogramFinalResult.get(rt));
                                                } else {
                                                    histogramFinalResult.put(rt, count);
                                                }
                                            }
                                        } else {
                                            histogramFinalResultWithTag.put(tag, histogramResult);
                                        }
                                    }
                                } else {
                                    histogramFinalResultMap.put(chain, histogramResultWithTag);
                                }
                            }
                        }
                    }
                });
        jssc.start();
        jssc.awaitTerminationOrTimeout(timeLimit * 1000);
        jssc.close();

在我的应用程序中,我将初始化400个接收器。
我不知道为什么发射的接收器这么慢(给我一些可能的原因)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题