io.reactivex.Observable.lift()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.2k)|赞(0)|评价(0)|浏览(130)

本文整理了Java中io.reactivex.Observable.lift()方法的一些代码示例,展示了Observable.lift()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.lift()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:lift

Observable.lift介绍

[英]This method requires advanced knowledge about building operators, please consider other standard composition methods first; Returns an Observable which, when subscribed to, invokes the ObservableOperator#apply(Observer) method of the provided ObservableOperator for each individual downstream Observer and allows the insertion of a custom operator by accessing the downstream's Observer during this subscription phase and providing a new Observer, containing the custom operator's intended business logic, that will be used in the subscription process going further upstream.

Generally, such a new Observer will wrap the downstream's Observer and forwards the onNext, onError and onComplete events from the upstream directly or according to the emission pattern the custom operator's business logic requires. In addition, such operator can intercept the flow control calls of dispose and isDisposed that would have traveled upstream and perform additional actions depending on the same business logic requirements.

Example:

// Step 1: Create the consumer type that will be returned by the ObservableOperator.apply(): 
public final class CustomObserver<T> implements Observer<T>, Disposable { 
// The downstream's Observer that will receive the onXXX events 
final Observer<? super String> downstream; 
// The connection to the upstream source that will call this class' onXXX methods 
Disposable upstream; 
// The constructor takes the downstream subscriber and usually any other parameters 
public CustomObserver(Observer<? super String> downstream) { 
this.downstream = downstream; 
} 
// In the subscription phase, the upstream sends a Disposable to this class 
// and subsequently this class has to send a Disposable to the downstream. 
// Note that relaying the upstream's Disposable directly is not allowed in RxJava 
@Override 
public void onSubscribe(Disposable s) { 
if (upstream != null) { 
s.cancel(); 
} else { 
upstream = s; 
downstream.onSubscribe(this); 
} 
} 
// The upstream calls this with the next item and the implementation's 
// responsibility is to emit an item to the downstream based on the intended 
// business logic, or if it can't do so for the particular item, 
// request more from the upstream 
@Override 
public void onNext(T item) { 
String str = item.toString(); 
if (str.length() < 2) { 
downstream.onNext(str); 
} 
// Observable doesn't support backpressure, therefore, there is no 
// need or opportunity to call upstream.request(1) if an item 
// is not produced to the downstream 
} 
// Some operators may handle the upstream's error while others 
// could just forward it to the downstream. 
@Override 
public void onError(Throwable throwable) { 
downstream.onError(throwable); 
} 
// When the upstream completes, usually the downstream should complete as well. 
@Override 
public void onComplete() { 
downstream.onComplete(); 
} 
// Some operators may use their own resources which should be cleaned up if 
// the downstream disposes the flow before it completed. Operators without 
// resources can simply forward the dispose to the upstream. 
// In some cases, a disposed flag may be set by this method so that other parts 
// of this class may detect the dispose and stop sending events 
// to the downstream. 
@Override 
public void dispose() { 
upstream.dispose(); 
} 
// Some operators may simply forward the call to the upstream while others 
// can return the disposed flag set in dispose(). 
@Override 
public boolean isDisposed() { 
return upstream.isDisposed(); 
} 
} 
// Step 2: Create a class that implements the ObservableOperator interface and 
//         returns the custom consumer type from above in its apply() method. 
//         Such class may define additional parameters to be submitted to 
//         the custom consumer type. 
final class CustomOperator<T> implements ObservableOperator<String> { 
@Override 
public Observer<? super String> apply(Observer<? super T> upstream) { 
return new CustomObserver<T>(upstream); 
} 
} 
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it 
//         or reusing an existing one. 
Observable.range(5, 10) 
.lift(new CustomOperator<Integer>()) 
.test() 
.assertResult("5", "6", "7", "8", "9");

Creating custom operators can be complicated and it is recommended one consults the RxJava wiki: Writing operators page about the tools, requirements, rules, considerations and pitfalls of implementing them.

Note that implementing custom operators via this lift() method adds slightly more overhead by requiring an additional allocation and indirection per assembled flows. Instead, extending the abstract Observableclass and creating an ObservableTransformer with it is recommended.

Note also that it is not possible to stop the subscription phase in lift() as the apply() method requires a non-null Observer instance to be returned, which is then unconditionally subscribed to the upstream Observable. For example, if the operator decided there is no reason to subscribe to the upstream source because of some optimization possibility or a failure to prepare the operator, it still has to return an Observer that should immediately dispose the upstream's Disposable in its onSubscribe method. Again, using an ObservableTransformer and extending the Observable is a better option as #subscribeActual can decide to not subscribe to its upstream after all. Scheduler: lift does not operate by default on a particular Scheduler, however, the ObservableOperator may use a Scheduler to support its own asynchronous behavior.
[中]这种方法需要先进的建筑操作员的知识,请先考虑其他标准的作曲方法;返回一个Observable,当订阅时,它为每个单独的下游观察者调用所提供的ObserveOperator的ObserveOperator#apply(Observator)方法,并允许通过在订阅阶段访问下游观察者并提供新观察者来插入自定义操作符,包含定制运营商的预期业务逻辑,将在更上游的订阅流程中使用。
通常,这样一个新的观察者将包装下游的观察者,并直接或根据定制运营商的业务逻辑要求的排放模式,从上游转发onNext、onError和onComplete事件。此外,此类操作员可以拦截dispose和isDisposed的流控制调用,这些调用本应向上游移动,并根据相同的业务逻辑要求执行其他操作。
例子:

// Step 1: Create the consumer type that will be returned by the ObservableOperator.apply(): 
public final class CustomObserver<T> implements Observer<T>, Disposable { 
// The downstream's Observer that will receive the onXXX events 
final Observer<? super String> downstream; 
// The connection to the upstream source that will call this class' onXXX methods 
Disposable upstream; 
// The constructor takes the downstream subscriber and usually any other parameters 
public CustomObserver(Observer<? super String> downstream) { 
this.downstream = downstream; 
} 
// In the subscription phase, the upstream sends a Disposable to this class 
// and subsequently this class has to send a Disposable to the downstream. 
// Note that relaying the upstream's Disposable directly is not allowed in RxJava 
@Override 
public void onSubscribe(Disposable s) { 
if (upstream != null) { 
s.cancel(); 
} else { 
upstream = s; 
downstream.onSubscribe(this); 
} 
} 
// The upstream calls this with the next item and the implementation's 
// responsibility is to emit an item to the downstream based on the intended 
// business logic, or if it can't do so for the particular item, 
// request more from the upstream 
@Override 
public void onNext(T item) { 
String str = item.toString(); 
if (str.length() < 2) { 
downstream.onNext(str); 
} 
// Observable doesn't support backpressure, therefore, there is no 
// need or opportunity to call upstream.request(1) if an item 
// is not produced to the downstream 
} 
// Some operators may handle the upstream's error while others 
// could just forward it to the downstream. 
@Override 
public void onError(Throwable throwable) { 
downstream.onError(throwable); 
} 
// When the upstream completes, usually the downstream should complete as well. 
@Override 
public void onComplete() { 
downstream.onComplete(); 
} 
// Some operators may use their own resources which should be cleaned up if 
// the downstream disposes the flow before it completed. Operators without 
// resources can simply forward the dispose to the upstream. 
// In some cases, a disposed flag may be set by this method so that other parts 
// of this class may detect the dispose and stop sending events 
// to the downstream. 
@Override 
public void dispose() { 
upstream.dispose(); 
} 
// Some operators may simply forward the call to the upstream while others 
// can return the disposed flag set in dispose(). 
@Override 
public boolean isDisposed() { 
return upstream.isDisposed(); 
} 
} 
// Step 2: Create a class that implements the ObservableOperator interface and 
//         returns the custom consumer type from above in its apply() method. 
//         Such class may define additional parameters to be submitted to 
//         the custom consumer type. 
final class CustomOperator<T> implements ObservableOperator<String> { 
@Override 
public Observer<? super String> apply(Observer<? super T> upstream) { 
return new CustomObserver<T>(upstream); 
} 
} 
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it 
//         or reusing an existing one. 
Observable.range(5, 10) 
.lift(new CustomOperator<Integer>()) 
.test() 
.assertResult("5", "6", "7", "8", "9");

创建自定义运算符可能会很复杂,建议您参考{$0$}页面,了解实现这些运算符的工具、要求、规则、注意事项和陷阱。
请注意,通过这个lift()方法实现自定义运算符会增加一些开销,因为每个组装的流需要额外的分配和间接寻址。相反,建议扩展抽象Observableclass并使用它创建ObservableTransformer。
还要注意的是,在lift()中不可能停止订阅阶段,因为apply()方法要求返回一个非空的观察者实例,然后无条件地订阅上游可观察对象。例如,如果运营商由于某种优化可能性或运营商准备失败而决定没有理由订阅上游源,那么它仍然必须返回一个观察员,该观察员应立即在其订阅方法中处理上游的一次性资源。同样,使用ObservateTransformer并扩展Observated是一个更好的选择,因为#subscribeActual最终可以决定不订阅其上游。调度器:默认情况下,lift不会在特定的调度器上运行,但是,ObserveOperator可以使用调度器来支持自己的异步行为。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void liftNull() {
  just1.lift(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void liftReturnsNull() {
  just1.lift(new ObservableOperator<Object, Integer>() {
    @Override
    public Observer<? super Integer> apply(Observer<? super Object> observer) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnStartCalledOnceViaLift() {
  final AtomicInteger c = new AtomicInteger();
  Observable.just(1, 2, 3, 4).lift(new ObservableOperator<Integer, Integer>() {
    @Override
    public Observer<? super Integer> apply(final Observer<? super Integer> child) {
      return new DefaultObserver<Integer>() {
        @Override
        public void onStart() {
          c.incrementAndGet();
        }
        @Override
        public void onComplete() {
          child.onComplete();
        }
        @Override
        public void onError(Throwable e) {
          child.onError(e);
        }
        @Override
        public void onNext(Integer t) {
          child.onNext(t);
        }
      };
    }
  }).subscribe();
  assertEquals(1, c.get());
}

代码示例来源:origin: ReactiveX/RxJava

public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperatorOnNext() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.just(1).lift(new ObservableOperator<String, Integer>() {

代码示例来源:origin: ReactiveX/RxJava

/**
 * Test that we receive the onError if an exception is thrown from an operator that
 * does not have manual try/catch handling like map does.
 */
@Test
@Ignore("Failed operator may leave the child Observer in an inconsistent state which prevents further error delivery.")
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.just(1).lift(new ObservableOperator<String, Integer>() {
    @Override
    public Observer<? super Integer> apply(Observer<? super String> t1) {
      throw new RuntimeException("failed");
    }
  }).onErrorResumeNext(new Function<Throwable, Observable<String>>() {
    @Override
    public Observable<String> apply(Throwable t1) {
      if (t1.getMessage().equals("failed")) {
        return Observable.just("success");
      } else {
        return Observable.error(t1);
      }
    }
  }).subscribe(to);
  to.assertTerminated();
  System.out.println(to.values());
  to.assertValue("success");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void callbackCrash() {
    try {
      Observable.just(1)
      .lift(new ObservableOperator<Object, Integer>() {
        @Override
        public Observer<? super Integer> apply(Observer<? super Object> o) throws Exception {
          throw new TestException();
        }
      })
      .test();
      fail("Should have thrown");
    } catch (NullPointerException ex) {
      assertTrue(ex.toString(), ex.getCause() instanceof TestException);
    }
  }
}

代码示例来源:origin: square/sqlbrite

@Test public void mapToListIgnoresNullCursor() {
 Query nully = new Query() {
  @Nullable @Override public Cursor run() {
   return null;
  }
 };
 TestObserver<List<Employee>> subscriber = new TestObserver<>();
 Observable.just(nully)
   .lift(Query.mapToList(MAPPER))
   .subscribe(subscriber);
 subscriber.assertNoValues();
 subscriber.assertComplete();
}

代码示例来源:origin: square/sqlbrite

@Test public void mapToOneIgnoresNullCursor() {
 Query nully = new Query() {
  @Nullable @Override public Cursor run() {
   return null;
  }
 };
 TestObserver<Employee> observer = new TestObserver<>();
 Observable.just(nully)
   .lift(Query.mapToOne(MAPPER))
   .subscribe(observer);
 observer.assertNoValues();
 observer.assertComplete();
}

代码示例来源:origin: square/sqlbrite

@SdkSuppress(minSdkVersion = Build.VERSION_CODES.N)
 @Test public void mapToOptionalIgnoresNullCursor() {
  Query nully = new Query() {
   @Nullable @Override public Cursor run() {
    return null;
   }
  };

  Observable.just(nully)
    .lift(Query.mapToOptional(MAPPER))
    .test()
    .assertValue(Optional.<Employee>empty());
 }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitSingleEventWithSinglePermit() {
  Observable.just(1)
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertResult(1);
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitAllEvents() {
  Observable.fromArray("Event 1", "Event 2")
    .lift(BulkheadOperator.of(bulkhead))
    .test()
    .assertResult("Event 1", "Event 2");
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  Observable.error(new IOException("BAM!"))
    .lift(BulkheadOperator.of(bulkhead))
    .test()
    .assertSubscribed()
    .assertError(IOException.class)
    .assertNotComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: ReactiveX/RxJava

.lift(new ObservableOperator<Long, Long>() {
@Override
public Observer<? super Long> apply(final Observer<? super Long> child) {

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
  saturateRateLimiter();
  Observable.just(1)
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertSubscribed()
    .assertError(RequestNotPermitted.class)
    .assertNotComplete();
  assertNoPermitLeft();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithBulkheadFullException() {
  bulkhead.isCallPermitted();
  Observable.fromArray("Event 1", "Event 2")
    .lift(BulkheadOperator.of(bulkhead))
    .test()
    .assertSubscribed()
    .assertError(BulkheadFullException.class)
    .assertNotComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  Observable.error(new IOException("BAM!"))
    .lift(CircuitBreakerOperator.of(circuitBreaker))
    .test()
    .assertSubscribed()
    .assertError(IOException.class)
    .assertNotComplete();
  assertSingleFailedCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  Observable.error(new IOException("BAM!"))
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertSubscribed()
    .assertError(IOException.class)
    .assertNotComplete();
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitAllEvents() {
  Observable.fromArray(1, 2)
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertResult(1, 2);
  assertUsedPermits(2);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitAllEvents() {
  Observable.fromArray("Event 1", "Event 2")
    .lift(CircuitBreakerOperator.of(circuitBreaker))
    .test()
    .assertResult("Event 1", "Event 2");
  assertSingleSuccessfulCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
  circuitBreaker.transitionToOpenState();
  Observable.fromArray("Event 1", "Event 2")
    .lift(CircuitBreakerOperator.of(circuitBreaker))
    .test()
    .assertSubscribed()
    .assertError(CircuitBreakerOpenException.class)
    .assertNotComplete();
  assertNoRegisteredCall();
}

相关文章

Observable类方法