rx.Observable.switchMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(127)

本文整理了Java中rx.Observable.switchMap()方法的一些代码示例,展示了Observable.switchMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.switchMap()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:switchMap

Observable.switchMap介绍

[英]Returns a new Observable by applying a function that you supply to each item emitted by the source Observable that returns an Observable, and then emitting the items emitted by the most recently emitted of these Observables.

Scheduler: switchMap does not operate by default on a particular Scheduler.
[中]通过对源可观测项发出的每个项应用一个函数返回一个新的可观测项,然后发射这些可观测项中最近发射的项。
调度器:switchMap默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: konmik/nucleus

.subscribe(subject);
return view
  .switchMap(new Func1<View, Observable<Delivery<View, T>>>() {
    @Override
    public Observable<Delivery<View, T>> call(final View view) {

代码示例来源:origin: konmik/nucleus

@Override
  public Observable<Delivery<View, T>> call(Observable<T> observable) {
    return observable.materialize()
      .take(1)
      .switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
        @Override
        public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
          return view.map(new Func1<View, Delivery<View, T>>() {
            @Override
            public Delivery<View, T> call(View view) {
              return view == null ? null : new Delivery<>(view, notification);
            }
          });
        }
      })
      .filter(new Func1<Delivery<View, T>, Boolean>() {
        @Override
        public Boolean call(Delivery<View, T> delivery) {
          return delivery != null;
        }
      })
      .take(1);
  }
}

代码示例来源:origin: cn-ljb/rxjava_for_android

.switchMap(new Func1<TextViewTextChangeEvent, Observable<List<String>>>() {
  @Override
  public Observable<List<String>> call(TextViewTextChangeEvent textViewTextChangeEvent) {

代码示例来源:origin: com.netflix.eureka/eureka2-core

protected <T> Observable<T> submitForResult(final Callable<Observable<T>> task) {
  return Observable.create(new Observable.OnSubscribe<Observable<T>>() {
    @Override
    public void call(Subscriber<? super Observable<T>> subscriber) {
      addAndSchedule(new InvokerTaskWithResult<>(task, subscriber));
    }
  }).switchMap(new Func1<Observable<T>, Observable<? extends T>>() {
    @Override
    public Observable<? extends T> call(Observable<T> tObservable) {
      return tObservable;
    }
  });
}

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

@Override
public Observable<Void> update(final InstanceInfo newInfo) {
  if (state.get() == STATE.Closed) {
    return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  }
  return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
    @Override
    public Observable<Void> call(MessageConnection connection) {
      return connection.submit(new UpdateCopy(newInfo));
    }
  });
}

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

@Override
public Observable<Void> register(final InstanceInfo instanceInfo) {
  if (state.get() == STATE.Closed) {
    return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  }
  return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
    @Override
    public Observable<Void> call(MessageConnection connection) {
      return connection.submit(new RegisterCopy(instanceInfo));
    }
  });
}

代码示例来源:origin: com.nytimes.android/store

@Override
  public Observable<T> call(Observable<T> upstream) {
    return upstream.repeatWhen(events -> events.switchMap(aVoid -> source));
  }
}

代码示例来源:origin: pakoito/RxComprehensions

@Override
  public Observable<R> call(final A a) {
    return one.call(a).switchMap(new Func1<B, Observable<R>>() {
      @Override
      public Observable<R> call(final B b) {
        return two.call(a, b);
      }
    });
  }
});

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

@Override
  public Observable<Void> unregister(final String instanceId) {
    if (state.get() == STATE.Closed) {
      return Observable.error(CHANNEL_CLOSED_EXCEPTION);
    }

    return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
      @Override
      public Observable<Void> call(MessageConnection connection) {
        return connection.submit(new UnregisterCopy(instanceId));
      }
    });
  }
}

代码示例来源:origin: pakoito/RxComprehensions

/**
 * Composes an {@link rx.Observable} from multiple creation functions chained by {@link Observable#switchMap(Func1)}.
 *
 * @return composed Observable
 */
public static <A, R> Observable<R> doSwitchMap(
    final Func0<Observable<A>> zero,
    final Func1<A, Observable<R>> one) {
  return zero.call().switchMap(new Func1<A, Observable<R>>() {
    @Override
    public Observable<R> call(final A a) {
      return one.call(a);
    }
  });
}

代码示例来源:origin: pakoito/RxComprehensions

@Override
  public Observable<R> call(final A a) {
    return one.call(a).switchMap(new Func1<B, Observable<R>>() {
      @Override
      public Observable<R> call(final B b) {
        return two.call(a, b).switchMap(new Func1<C, Observable<R>>() {
          @Override
          public Observable<R> call(final C c) {
            return three.call(a, b, c);
          }
        });
      }
    });
  }
});

代码示例来源:origin: com.netflix.eureka2/eureka-client

@Override
public Observable<Void> update(final InstanceInfo newInfo) {
  STATES currentState = state.get();
  switch (currentState) {
    case Idle:
      return Observable.error(INSTANCE_NOT_REGISTERED_EXCEPTION);
    case Registered:
      //TODO: Need to serialize register -> update -> unregister. With this code both they can be interleaved
      return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
        @Override
        public Observable<? extends Void> call(MessageConnection connection) {
          return connection.submitWithAck(new Update(newInfo));
        }
      });
    case Closed:
      return Observable.error(CHANNEL_CLOSED_EXCEPTION);
    default:
      return Observable.error(new IllegalStateException("Unrecognized channel state: " + currentState));
  }
}

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

@Override
public Observable<Void> unregister(final String instanceId) {
  if (state.get() != STATE.Connected) {
    return invalidStateError();
  }
  return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
    @Override
    public Observable<Void> call(MessageConnection connection) {
      return sendOnConnection(connection, new UnregisterCopy(instanceId));
    }
  });
}

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

@Override
public Observable<Void> register(final InstanceInfo instanceInfo) {
  if (state.get() != STATE.Connected) {
    return invalidStateError();
  }
  return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
    @Override
    public Observable<Void> call(MessageConnection connection) {
      return sendOnConnection(connection, new RegisterCopy(instanceInfo));
    }
  });
}

代码示例来源:origin: com.netflix.eureka/eureka2-client

@Override
public Observable<Void> register(final InstanceInfo instanceInfo) {
  if (!moveToState(STATE.Idle, STATE.Registered) && state.get() != STATE.Registered) {
    STATE currentState = state.get();
    if (currentState == STATE.Closed) {
      return Observable.error(CHANNEL_CLOSED_EXCEPTION);
    } else {
      return Observable.error(new IllegalStateException(
          "Error advancing to state Registered from state " + currentState));
    }
  }
  return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
    @Override
    public Observable<? extends Void> call(MessageConnection connection) {
      return sendExpectAckOnConnection(connection, new Register(instanceInfo));
    }
  });
}

代码示例来源:origin: com.netflix.eureka2/eureka-client

@Override
public Observable<Void> unregister() {
  if (!moveToState(STATES.Registered, STATES.Closed)) {
    STATES currentState = state.get();
    if (currentState == STATES.Idle) {
      return Observable.error(INSTANCE_NOT_REGISTERED_EXCEPTION);
    }
    if (currentState == STATES.Closed) {
      return Observable.error(CHANNEL_CLOSED_EXCEPTION);
    }
    return Observable.error(new IllegalStateException("Unrecognized channel state: " + currentState));
  }
  //TODO: Need to serialize register -> update -> unregister. With this code both they can be interleaved
  return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
    @Override
    public Observable<? extends Void> call(MessageConnection connection) {
      return connection.submitWithAck(Unregister.INSTANCE);
    }
  });
}

代码示例来源:origin: com.netflix.eureka2/eureka-client

@Override
public Observable<Void> register(final InstanceInfo instanceInfo) {
  if (!moveToState(STATES.Idle, STATES.Registered)) {// State check. Only register if the state is Idle.
    STATES currentState = state.get();
    switch (currentState) {
      case Registered:
        return Observable.error(INSTANCE_ALREADY_REGISTERED_EXCEPTION);
      case Closed:
        return Observable.error(CHANNEL_CLOSED_EXCEPTION);
    }
  }
  //TODO: Need to serialize register -> update -> unregister. With this code both they can be interleaved
  return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
    @Override
    public Observable<? extends Void> call(MessageConnection connection) {
      return connection.submitWithAck(new Register(instanceInfo));
    }
  });
}

代码示例来源:origin: patrick-doyle/android-rxmvp-tutorial

private Subscription observeLookupButton() {
  return view.observeButton()
    .doOnNext(__ -> view.showLoading(true))
    .map(__ -> view.getUsernameEdit())
    .observeOn(Schedulers.io())
    .switchMap(username -> model.getUserReops(username))
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(gitHubRepoList -> model.saveRepoListState(gitHubRepoList))
    .doOnEach(__ -> view.showLoading(false))
    .retry()
    .subscribe(gitHubRepoList -> {
     model.startRepoActivity(gitHubRepoList);
    });
 }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test() throws Exception {
    Observable<String> delay = Observable.just("")
        .switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
        .map( a -> String.valueOf(a) )
        .repeat();

    Observable<String> messages = Observable.just("Test") //eventually lines from a file...
        .repeat();

    messages.zipWith(delay, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );

    Thread.sleep(10000);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
    return f ->
      f.publish(g ->
        g.take(1)
        .concatWith(
          g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
          .take(1)
          .ignoreElements()
        )
        .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
      )
      ;
  }
}

相关文章

Observable类方法