// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb, new Func2<String, String, MyResult>() {
@Override
public MyResult call(String movieUploadResponse, String picUploadResponse) {
// analyze both responses, upload them to another server
// and return this method with a MyResult type
return myResult;
}
}
)
// continue chaining this observable with subscriber
// or use it for something else
//
// API Client Interface
//
@GET(ServicesConstants.API_PREFIX + "questions/{id}/")
Single<Response<ResponseGeneric<List<ResponseQuestion>>>> getBaseQuestions(@Path("id") int personId);
@GET(ServicesConstants.API_PREFIX + "physician/{id}/")
Single<Response<ResponseGeneric<List<ResponsePhysician>>>> getPhysicianInfo(@Path("id") int personId);
//
// API middle layer - NOTE: I had feedback that the Single.create is not needed (but I haven't yet spent the time to improve it)
//
public Single<List<ResponsePhysician>> getPhysicianInfo(int personId) {
return Single.create(subscriber -> {
apiClient.getPhysicianInfo(appId)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
ResponseGeneric<List<ResponsePhysician>> responseBody = response.body();
if(responseBody != null && responseBody.statusCode == 1) {
if (!subscriber.isDisposed()) subscriber.onSuccess(responseBody.data);
} else if(response.body() != null && response.body().status != null ){
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
});
});
}
public Single<List<ResponseQuestion>> getHealthQuestions(int personId){
return Single.create(subscriber -> {
apiClient.getBaseQuestions(personId)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
ResponseGeneric<List<ResponseQuestion>> responseBody = response.body();
if(responseBody != null && responseBody.data != null) {
if (!subscriber.isDisposed()) subscriber.onSuccess(response.body().data);
} else if(response.body() != null && response.body().status != null ){
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
});
});
}
//please note that ResponseGeneric is just an outer wrapper of the returned data - common to all API's in this project
public class ResponseGeneric<T> {
@SerializedName("Status")
public String status;
@SerializedName("StatusCode")
public float statusCode;
@SerializedName("Data")
public T data;
}
//
// API end-use layer - this gets close to the UI so notice the oberver is set for main thread
//
private static class MergedResponse{// this is just a POJO to store all the responses in one object
public List<ResponseQuestion> listQuestions;
public List<ResponsePhysician> listPhysicians;
public MergedResponse(List<ResponseQuestion> listQuestions, List<ResponsePhysician> listPhysicians){
this.listQuestions = listQuestions;
this.listPhysicians = listPhysicians;
}
}
// example of Single.zip() - calls getHealthQuestions() and getPhysicianInfo() from API Middle Layer
private void downloadHealthQuestions(int personId) {
addRxSubscription(Single
.zip(getHealthQuestions(personId), getPhysicianInfo(personId), MergedResponse::new)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(response -> {
if(response != null) {
Timber.i(" - total health questions downloaded %d", response.listQuestions.size());
Timber.i(" - physicians downloaded %d", response.listPhysicians.size());
if (response.listPhysicians != null && response.listPhysicians.size()>0) {
// do your stuff to process response data
}
if (response.listQuestions != null && response.listQuestions.size()>0) {
// do your stuff to process response data
}
} else {
// process error - show message
}
}, error -> {
// process error - show network error message
}));
}
8条答案
按热度按时间ilmyapht1#
Zip操作符严格地配对从观察对象发出的项。它等待两个(或更多)项到达,然后合并它们。所以是的,这将适合您的需要。
我会使用
Func2
链接前两个观察对象的结果,注意如果使用Retrofit,这种方法会更简单,因为它的API接口可能会返回一个观察对象,否则你需要创建自己的观察对象。tyky79it2#
举个小例子:
这将打印:
在
BiFunction<String, String, String>
中,第一个String
表示第一个可观察值的类型,第二个String
表示第二个可观察值的类型,第三个String
表示zippers函数返回值的类型。我在this blog post中创建了一个使用zip调用两个真实的端点的小示例
yhived7q3#
这里我有一个例子,我使用Zip异步的方式,只是为了防止你好奇
您可以在此处查看更多示例https://github.com/politrons/reactive
i5desfxk4#
zip
运算符允许您从两个不同可观察结果合成结果。您必须给予一个lambda,它将从每个可观察对象发出的数据中创建一个结果。
ckx4rj1h5#
我一直在寻找一个简单的答案,如何使用Zip操作符,以及如何处理我创建的Observables来将它们传递给它,我想知道我是否应该为每个Observable调用subscribe(),这些答案都不容易找到,我必须自己弄清楚,所以这里有一个简单的例子,在2个Observables上使用Zip操作符:
打印结果为:
关于我脑子里在哪里的问题的最终答案如下
传递给拉链的观测值()方法只需要被创建,它们不需要有任何订阅者,只需要创建它们就足够了......如果你想让任何可观察对象在调度器上运行,你可以为那个可观察对象指定这个......我也试过zip()运算符,在此它们应该等待结果,并且zip()的Consumable仅在两个结果都准备就绪时才被触发(这是预期行为)
e5nqia276#
这是我使用Single.zip和rxJava2实现的
我试着让它尽可能容易理解
h4cxqtbf7#
将
rxjava
中的zip
与Java 8
一起使用:at0kjp5o8#
可以对可观察链使用
.zipWith
运算符。如果
uploadMovies()
和uploadPictures()
返回可观测值,