rxjava将2个调用与错误处理结合起来,失败时会有延迟

o2gm4chl  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(340)

用例有两个数据源:
服务1-从源1获取
服务2-从源2获取
应用程序应该至少从source-1返回数据。如果source-2一切正常-数据将被“增强”,比如乘以100。
服务1呼叫服务2。
如果所有成功的用户都从服务1和服务2获取数据(如果服务2上有错误),则用户仅从服务1获取数据(至少),如果服务1上有错误,则用户将获取错误。
有一个hello world bench代码,它模拟了这个场景:

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;

class Response {

    public Integer value;
    public String warning;
    public Response(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Response{" +
                "value=" + value +
                ", warning='" + warning + '\'' +

                '}';
    }
}

class Service1 {

    public Observable<Response> call(int arg) {
        return Observable
                .just(
                        new Response(1),
                        new Response(2),
                        new Response(3),
                        new Response(4))
                .delay(100, TimeUnit.MILLISECONDS);
    }
}

class Service2 {

    public Observable<Response> call(int arg) {

        if ( arg % 2 == 0) {

            System.out.println("service 2: " + arg);

            return Observable
                    .just(new Response(100 * arg)) // service 2 multiplies x 100 on the result it gets from the service 1 
                    .delay(10, TimeUnit.MILLISECONDS);

        } else {

            System.out.println("service 2: " + arg);

            return Observable.error(new RuntimeException("service 2 error"));
        }
    }
}

public class Step1 {

    static Service1 service1 = new Service1();
    static Service2 service2 = new Service2();

    public static void main(String[] args) throws InterruptedException {

        var oo1 = service1.call(1);

        var oo3 = oo1.switchMapDelayError(x -> {

            final Observable<Response> oo2 = service2.call(x.value);

            return oo2
                    .onErrorReturn((ex) -> {
                        //System.out.println("Error handling..." + ex.getMessage() + " " + x);
                        x.warning = ex.getMessage();
                        return x; // returns at least service1 result
                    });
        });

        oo3.subscribe(x -> {
            System.out.println(x);
        });

        Thread.sleep(100000);
    }

}

此代码的结果是:

service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}

问题是:没有预期的: value=200 2*100
但是,如果我在service2.call()//.delay(10,timeunit.millishes)处注解一个延迟,那么它将得到预期的结果:

service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
Response{value=200, warning='null'}
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}

问题是:为什么 .delay(10, TimeUnit.MILLISECONDS) on service2.call() 它不能产生值=200?这个解决方案有什么问题,我错过了什么?
谢谢。

6rqinv9w

6rqinv9w1#

你的问题是 switchMapDelayError 接线员。您应该使用concatmap或flatmap
我冒昧地为您的用例编写了一个测试。请注意,始终使用重载来提供 Scheduler 为了提供 TestScheduler 用于测试。

switchmap做什么?

在每个上游emit switchmap上订阅给定的内部流。当一个新的值从上游发出时,旧的内部流被取消订阅,并且switchmap的lambda被再次调用以订阅新的内部流。
问题可能是以下代码:

return Observable
            .just(
                    new Response(1),
                    new Response(2),
                    new Response(3),
                    new Response(4))
            .delay(100, TimeUnit.MILLISECONDS);

它几乎一个接一个地在堆栈上发出响应1到4,并且每个发出都在另一个线程上延迟。因此,响应1到4几乎会立即发出。它们不会像以下那样发出:100ms时的响应(1),200ms时的响应(2),等等。
让我们看看输出是为了什么

Observable.just(
    new Response(1), //
    new Response(2),
    new Response(3),
    new Response(4))
    .delay(100, TimeUnit.MILLISECONDS)
    .subscribe(r -> {
      System.out.println("received value at " + Schedulers.io().now(TimeUnit.MILLISECONDS));
    });

输出

received value at 1607432032768
received value at 1607432032769
received value at 1607432032769
received value at 1607432032769

因此,所有值几乎都会立即发出,并用switchmap相互覆盖。以前发出的值几乎立即被新值抵消。

解决方案

使用concatmap或flatmap或更改测试设置以100ms的间隔发出每个值。
flatmap只订阅每个值,默认情况下最多128个内部流。当内部流完成时,concatmap将只订阅下一个值。
测试

public class So65193002 {
      @Test
      void so() {
        TestScheduler testScheduler = new TestScheduler();
        Service1 service1 = new Service1(testScheduler);
        Service2 service2 = new Service2(testScheduler);

        Observable<Response> service1Call = service1.call(1);

        Observable<Response> combined =
            service1Call.concatMapEagerDelayError(
                x -> {
                  return service2
                      .call(x.value)
                      .onErrorReturn(
                          (ex) -> {
                            x.warning = ex.getMessage();
                            return x; // returns at least service1 result
                          });
                },
                true);

        TestObserver<Response> test = combined.test();

        testScheduler.advanceTimeBy(1, TimeUnit.HOURS);

        test.assertValueCount(4)
            .assertValueAt(
                0,
                r -> {
                  assertThat(r.value).isEqualTo(1);
                  assertThat(r.warning).isNotEmpty();
                  return true;
                })
            .assertValueAt(
                1,
                r -> {
                  assertThat(r.value).isEqualTo(200);
                  assertThat(r.warning).isNull();
                  return true;
                })
            .assertValueAt(
                3,
                r -> {
                  assertThat(r.value).isEqualTo(400);
                  assertThat(r.warning).isNull();
                  return true;
                });
      }
    }

class Response {
  public Integer value;
  public String warning;

  public Response(Integer value) {
    this.value = value;
  }

  @Override
  public String toString() {
    return "Response{" + "value=" + value + ", warning='" + warning + '\'' + '}';
  }
}

class Service1 {
  private final Scheduler scheduler;

  Service1(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  public Observable<Response> call(int arg) {
    return Observable.just(
            new Response(1), //
            new Response(2),
            new Response(3),
            new Response(4))
        .delay(100, TimeUnit.MILLISECONDS, scheduler);
  }
}

class Service2 {
  private final Scheduler scheduler;

  Service2(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  public Observable<Response> call(int arg) {
    if (arg % 2 == 0) {
      return Observable.just(new Response(100 * arg)).delay(10, TimeUnit.MILLISECONDS, scheduler);

    } else {
      return Observable.error(new RuntimeException("service 2 error"));
    }
  }
}

注意

不要使用可变对象。始终确保发出的值是不可变的,否则会遇到麻烦。

相关问题