
x33g5p2x  于2022-01-25 转载在 其他  



[英]This method requires advanced knowledge about building operators, please consider other standard composition methods first; Returns an Observable which, when subscribed to, invokes the ObservableOperator#apply(Observer) method of the provided ObservableOperator for each individual downstream Observer and allows the insertion of a custom operator by accessing the downstream's Observer during this subscription phase and providing a new Observer, containing the custom operator's intended business logic, that will be used in the subscription process going further upstream.

Generally, such a new Observer will wrap the downstream's Observer and forwards the onNext, onError and onComplete events from the upstream directly or according to the emission pattern the custom operator's business logic requires. In addition, such operator can intercept the flow control calls of dispose and isDisposed that would have traveled upstream and perform additional actions depending on the same business logic requirements.


// Step 1: Create the consumer type that will be returned by the ObservableOperator.apply(): 
public final class CustomObserver<T> implements Observer<T>, Disposable { 
// The downstream's Observer that will receive the onXXX events 
final Observer<? super String> downstream; 
// The connection to the upstream source that will call this class' onXXX methods 
Disposable upstream; 
// The constructor takes the downstream subscriber and usually any other parameters 
public CustomObserver(Observer<? super String> downstream) { 
this.downstream = downstream; 
// In the subscription phase, the upstream sends a Disposable to this class 
// and subsequently this class has to send a Disposable to the downstream. 
// Note that relaying the upstream's Disposable directly is not allowed in RxJava 
public void onSubscribe(Disposable s) { 
if (upstream != null) { 
} else { 
upstream = s; 
// The upstream calls this with the next item and the implementation's 
// responsibility is to emit an item to the downstream based on the intended 
// business logic, or if it can't do so for the particular item, 
// request more from the upstream 
public void onNext(T item) { 
String str = item.toString(); 
if (str.length() < 2) { 
// Observable doesn't support backpressure, therefore, there is no 
// need or opportunity to call upstream.request(1) if an item 
// is not produced to the downstream 
// Some operators may handle the upstream's error while others 
// could just forward it to the downstream. 
public void onError(Throwable throwable) { 
// When the upstream completes, usually the downstream should complete as well. 
public void onComplete() { 
// Some operators may use their own resources which should be cleaned up if 
// the downstream disposes the flow before it completed. Operators without 
// resources can simply forward the dispose to the upstream. 
// In some cases, a disposed flag may be set by this method so that other parts 
// of this class may detect the dispose and stop sending events 
// to the downstream. 
public void dispose() { 
// Some operators may simply forward the call to the upstream while others 
// can return the disposed flag set in dispose(). 
public boolean isDisposed() { 
return upstream.isDisposed(); 
// Step 2: Create a class that implements the ObservableOperator interface and 
//         returns the custom consumer type from above in its apply() method. 
//         Such class may define additional parameters to be submitted to 
//         the custom consumer type. 
final class CustomOperator<T> implements ObservableOperator<String> { 
public Observer<? super String> apply(Observer<? super T> upstream) { 
return new CustomObserver<T>(upstream); 
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it 
//         or reusing an existing one. 
Observable.range(5, 10) 
.lift(new CustomOperator<Integer>()) 
.assertResult("5", "6", "7", "8", "9");

Creating custom operators can be complicated and it is recommended one consults the RxJava wiki: Writing operators page about the tools, requirements, rules, considerations and pitfalls of implementing them.

Note that implementing custom operators via this lift() method adds slightly more overhead by requiring an additional allocation and indirection per assembled flows. Instead, extending the abstract Observableclass and creating an ObservableTransformer with it is recommended.

Note also that it is not possible to stop the subscription phase in lift() as the apply() method requires a non-null Observer instance to be returned, which is then unconditionally subscribed to the upstream Observable. For example, if the operator decided there is no reason to subscribe to the upstream source because of some optimization possibility or a failure to prepare the operator, it still has to return an Observer that should immediately dispose the upstream's Disposable in its onSubscribe method. Again, using an ObservableTransformer and extending the Observable is a better option as #subscribeActual can decide to not subscribe to its upstream after all. Scheduler: lift does not operate by default on a particular Scheduler, however, the ObservableOperator may use a Scheduler to support its own asynchronous behavior.

// Step 1: Create the consumer type that will be returned by the ObservableOperator.apply(): 
public final class CustomObserver<T> implements Observer<T>, Disposable { 
// The downstream's Observer that will receive the onXXX events 
final Observer<? super String> downstream; 
// The connection to the upstream source that will call this class' onXXX methods 
Disposable upstream; 
// The constructor takes the downstream subscriber and usually any other parameters 
public CustomObserver(Observer<? super String> downstream) { 
this.downstream = downstream; 
// In the subscription phase, the upstream sends a Disposable to this class 
// and subsequently this class has to send a Disposable to the downstream. 
// Note that relaying the upstream's Disposable directly is not allowed in RxJava 
public void onSubscribe(Disposable s) { 
if (upstream != null) { 
} else { 
upstream = s; 
// The upstream calls this with the next item and the implementation's 
// responsibility is to emit an item to the downstream based on the intended 
// business logic, or if it can't do so for the particular item, 
// request more from the upstream 
public void onNext(T item) { 
String str = item.toString(); 
if (str.length() < 2) { 
// Observable doesn't support backpressure, therefore, there is no 
// need or opportunity to call upstream.request(1) if an item 
// is not produced to the downstream 
// Some operators may handle the upstream's error while others 
// could just forward it to the downstream. 
public void onError(Throwable throwable) { 
// When the upstream completes, usually the downstream should complete as well. 
public void onComplete() { 
// Some operators may use their own resources which should be cleaned up if 
// the downstream disposes the flow before it completed. Operators without 
// resources can simply forward the dispose to the upstream. 
// In some cases, a disposed flag may be set by this method so that other parts 
// of this class may detect the dispose and stop sending events 
// to the downstream. 
public void dispose() { 
// Some operators may simply forward the call to the upstream while others 
// can return the disposed flag set in dispose(). 
public boolean isDisposed() { 
return upstream.isDisposed(); 
// Step 2: Create a class that implements the ObservableOperator interface and 
//         returns the custom consumer type from above in its apply() method. 
//         Such class may define additional parameters to be submitted to 
//         the custom consumer type. 
final class CustomOperator<T> implements ObservableOperator<String> { 
public Observer<? super String> apply(Observer<? super T> upstream) { 
return new CustomObserver<T>(upstream); 
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it 
//         or reusing an existing one. 
Observable.range(5, 10) 
.lift(new CustomOperator<Integer>()) 
.assertResult("5", "6", "7", "8", "9");



代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void liftNull() {

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void liftReturnsNull() {
  just1.lift(new ObservableOperator<Object, Integer>() {
    public Observer<? super Integer> apply(Observer<? super Object> observer) {
      return null;

代码示例来源:origin: ReactiveX/RxJava

public void testOnStartCalledOnceViaLift() {
  final AtomicInteger c = new AtomicInteger();
  Observable.just(1, 2, 3, 4).lift(new ObservableOperator<Integer, Integer>() {
    public Observer<? super Integer> apply(final Observer<? super Integer> child) {
      return new DefaultObserver<Integer>() {
        public void onStart() {
        public void onComplete() {
        public void onError(Throwable e) {
        public void onNext(Integer t) {
  assertEquals(1, c.get());

代码示例来源:origin: ReactiveX/RxJava

public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperatorOnNext() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.just(1).lift(new ObservableOperator<String, Integer>() {

代码示例来源:origin: ReactiveX/RxJava

 * Test that we receive the onError if an exception is thrown from an operator that
 * does not have manual try/catch handling like map does.
@Ignore("Failed operator may leave the child Observer in an inconsistent state which prevents further error delivery.")
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.just(1).lift(new ObservableOperator<String, Integer>() {
    public Observer<? super Integer> apply(Observer<? super String> t1) {
      throw new RuntimeException("failed");
  }).onErrorResumeNext(new Function<Throwable, Observable<String>>() {
    public Observable<String> apply(Throwable t1) {
      if (t1.getMessage().equals("failed")) {
        return Observable.just("success");
      } else {
        return Observable.error(t1);

代码示例来源:origin: ReactiveX/RxJava

  public void callbackCrash() {
    try {
      .lift(new ObservableOperator<Object, Integer>() {
        public Observer<? super Integer> apply(Observer<? super Object> o) throws Exception {
          throw new TestException();
      fail("Should have thrown");
    } catch (NullPointerException ex) {
      assertTrue(ex.toString(), ex.getCause() instanceof TestException);

代码示例来源:origin: square/sqlbrite

@Test public void mapToListIgnoresNullCursor() {
 Query nully = new Query() {
  @Nullable @Override public Cursor run() {
   return null;
 TestObserver<List<Employee>> subscriber = new TestObserver<>();

代码示例来源:origin: square/sqlbrite

@Test public void mapToOneIgnoresNullCursor() {
 Query nully = new Query() {
  @Nullable @Override public Cursor run() {
   return null;
 TestObserver<Employee> observer = new TestObserver<>();

代码示例来源:origin: square/sqlbrite

@SdkSuppress(minSdkVersion = Build.VERSION_CODES.N)
 @Test public void mapToOptionalIgnoresNullCursor() {
  Query nully = new Query() {
   @Nullable @Override public Cursor run() {
    return null;


代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitSingleEventWithSinglePermit() {

代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitAllEvents() {
  Observable.fromArray("Event 1", "Event 2")
    .assertResult("Event 1", "Event 2");

代码示例来源:origin: resilience4j/resilience4j

public void shouldPropagateError() {
  Observable.error(new IOException("BAM!"))

代码示例来源:origin: ReactiveX/RxJava

.lift(new ObservableOperator<Long, Long>() {
public Observer<? super Long> apply(final Observer<? super Long> child) {

代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitErrorWithRequestNotPermittedException() {

代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitErrorWithBulkheadFullException() {
  Observable.fromArray("Event 1", "Event 2")

代码示例来源:origin: resilience4j/resilience4j

public void shouldPropagateError() {
  Observable.error(new IOException("BAM!"))

代码示例来源:origin: resilience4j/resilience4j

public void shouldPropagateError() {
  Observable.error(new IOException("BAM!"))

代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitAllEvents() {
  Observable.fromArray(1, 2)
    .assertResult(1, 2);

代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitAllEvents() {
  Observable.fromArray("Event 1", "Event 2")
    .assertResult("Event 1", "Event 2");

代码示例来源:origin: resilience4j/resilience4j

public void shouldEmitErrorWithCircuitBreakerOpenException() {
  Observable.fromArray("Event 1", "Event 2")

