rxandroid:知道单个操作何时完成以及所有操作何时完成

6tqwzwtp  于 2021-07-09  发布在  Java
关注(0)|答案(2)|浏览(447)

我是rxjava的新手,我已经通过改造实现了rxjava,以便使用flatmap同时下载多个文件。我正在成功接收单个下载的oncomplete状态。但我无法实现接收所有下载的完成状态的功能。
以下是我多次下载时使用的代码:

private void downloadFile(String url, final File parentFolder) {
    final Uri uri = Uri.parse(url);
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(url.replace(uri.getLastPathSegment(),""))
            .client(new OkHttpClient.Builder().build())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
    RestApi restApi = retrofit.create(RestApi.class);
    restApi.downloadFile(url)
            .flatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
                @Override
                public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
                    return Observable.fromCallable(new Callable<File>() {
                        @Override
                        public File call() throws Exception {
                            File file = new File(parentFolder.getAbsoluteFile(), uri.getLastPathSegment());

                            if (!file.exists()) {
                                file.createNewFile();

                                BufferedSink sink = Okio.buffer(Okio.sink(file));
                                sink.writeAll(responseBodyResponse.body().source());
                                sink.close();
                            }
                            return file;
                        }
                    });
                }
            },3)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<File>() {

                @Override
                public void onCompleted() {
                    dManager.logMe(TAG,"file download complete");
                }

                @Override
                public void onError(Throwable e) {
                    dManager.logMe(TAG,"file download error");
                    e.printStackTrace();
                }

                @Override
                public void onNext(File file) {
                    dManager.logMe(TAG,"file download onNext: " + file.getAbsolutePath());
                }
            });
}

我打电话来 downloadFile(String url, final File parentFolder 在许多for循环中,只要我收到url信息。提前谢谢。

vatpfxk5

vatpfxk51#

如果你有 List<String> URL数目:

Observable.from(urls)
            .flatMap(url -> restApi.downloadFile(url))
            .concatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
                @Override
                public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
                    try {
                        File file = new File(parentFolder.getAbsoluteFile(), uri.getLastPathSegment());
                        if (!file.exists()) {
                            file.createNewFile();
                        }
                        BufferedSink sink = Okio.buffer(Okio.sink(file));
                        sink.writeAll(responseBodyResponse.body().source());
                        sink.close();
                        return Observable.just(file);
                    } catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<File>() {

                @Override
                public void onCompleted() {
                    dManager.logMe(TAG, "file download complete");
                }

                @Override
                public void onError(Throwable e) {
                    dManager.logMe(TAG, "file download error");
                    e.printStackTrace();
                }

                @Override
                public void onNext(File file) {
                    dManager.logMe(TAG, "file download onNext: " + file.getAbsolutePath());
                }
            });
z3yyvxxp

z3yyvxxp2#

下面是一个划痕示例

Observable.from(List <Url>)
                .flatMap(Observable.fromCallable(downloadImageHereAndReturnTheFile())
                        .observeOn(AndroidSchedulers.mainThread())
                        // Do on next is where you get notified when each url is getting downloaded
                        .doOnNext(doTheUIOperationHere())
                        .observeOn(Schedulers.io()) //Switch to background thread if you want, in case if you do some other background task otherwise remove the above this and the following next operator 
                        .doOnNext(DoSomeOtherOperationHere()))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<R>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "The entire process of downloading all the image completed here");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(R r) {
                        Log.d(TAG, "You receive the file for each url");
                    }
                });

相关问题