io.reactivex.Observable.cast()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(101)

本文整理了Java中io.reactivex.Observable.cast()方法的一些代码示例,展示了Observable.cast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.cast()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:cast

Observable.cast介绍

[英]Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified type.

Scheduler: cast does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源ObservableSource发出的项,并转换为指定的类型。
调度程序:默认情况下,cast不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Observable<? extends Throwable> t1) {
    return t1.map(new Function<Throwable, Integer>() {
      @Override
      public Integer apply(Throwable t1) {
        return 0;
      }
    }).startWith(0).cast(Object.class);
  }
}).subscribe(observer);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void castNull() {
  just1.cast(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testCastWithWrongType() {
    Observable<?> source = Observable.just(1, 2);
    Observable<Boolean> observable = source.cast(Boolean.class);

    Observer<Boolean> observer = TestHelper.mockObserver();

    observable.subscribe(observer);

    verify(observer, times(1)).onError(
        any(ClassCastException.class));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCast() {
  Observable<?> source = Observable.just(1, 2);
  Observable<Integer> observable = source.cast(Integer.class);
  Observer<Integer> observer = TestHelper.mockObserver();
  observable.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(1);
  verify(observer, never()).onError(
      any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Observable<? extends Throwable> attempts) {
    // Worker w = Schedulers.computation().createWorker();
    return attempts
      .map(new Function<Throwable, Tuple>() {
        @Override
        public Tuple apply(Throwable n) {
          return new Tuple(new Long(1), n);
        }})
      .scan(new BiFunction<Tuple, Tuple, Tuple>() {
        @Override
        public Tuple apply(Tuple t, Tuple n) {
          return new Tuple(t.count + n.count, n.n);
        }})
      .flatMap(new Function<Tuple, Observable<Long>>() {
        @Override
        public Observable<Long> apply(Tuple t) {
          System.out.println("Retry # " + t.count);
          return t.count > 20 ?
            Observable.<Long>error(t.n) :
            Observable.timer(t.count * 1L, TimeUnit.MILLISECONDS);
      }}).cast(Object.class);
  }
}).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

/**
 * Filters the items emitted by an ObservableSource, only emitting those of the specified type.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <U> the output type
 * @param clazz
 *            the class type to filter the items emitted by the source ObservableSource
 * @return an Observable that emits items from the source ObservableSource of type {@code clazz}
 * @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> ofType(final Class<U> clazz) {
  ObjectHelper.requireNonNull(clazz, "clazz is null");
  return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}

代码示例来源:origin: redisson/redisson

/**
 * Filters the items emitted by an ObservableSource, only emitting those of the specified type.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <U> the output type
 * @param clazz
 *            the class type to filter the items emitted by the source ObservableSource
 * @return an Observable that emits items from the source ObservableSource of type {@code clazz}
 * @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> ofType(final Class<U> clazz) {
  ObjectHelper.requireNonNull(clazz, "clazz is null");
  return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}

代码示例来源:origin: leftcoding/GankLy

/**
 * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
 *
 * @param code      事件code
 * @param eventType 事件类型
 * @param <T>
 * @return
 */
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
  return bus.ofType(Message.class)
      .filter(message -> message.getCode() == code && eventType.isInstance(message.getObject()))
      .map(message -> message.getObject())
      .cast(eventType);
}

代码示例来源:origin: Cognifide/knotx

private void deployVerticles(JsonObject config, Future<Void> completion) {
 LOGGER.info("STARTING Knot.x {} @ {}", Version.getVersion(), Version.getBuildTime());
 Observable.fromIterable(config.getJsonArray(MODULES_ARRAY))
   .cast(String.class)
   .map(ModuleDescriptor::parse)
   .flatMap(item -> deployVerticle(config, item))
   .reduce(new ArrayList<ModuleDescriptor>(), (accumulator, item) -> {
    accumulator.add(item);
    return accumulator;
   })
   .subscribe(
     deployments -> {
      deployedModules = Lists.newArrayList(deployments);
      LOGGER.info("Knot.x STARTED {}", buildMessage());
      if (completion != null) {
       completion.complete();
      }
     },
     error -> {
      LOGGER.error("Verticle could not be deployed", error);
      if (completion != null) {
       completion.fail(error);
      }
     }
   );
}

代码示例来源:origin: io.knotx/knotx-core

private void deployVerticles(JsonObject config, Future<Void> completion) {
 LOGGER.info("STARTING Knot.x {} @ {}", Version.getVersion(), Version.getBuildTime());
 Observable.fromIterable(config.getJsonArray(MODULES_ARRAY))
   .cast(String.class)
   .map(ModuleDescriptor::parse)
   .flatMap(item -> deployVerticle(config, item))
   .reduce(new ArrayList<ModuleDescriptor>(), (accumulator, item) -> {
    accumulator.add(item);
    return accumulator;
   })
   .subscribe(
     deployments -> {
      deployedModules = Lists.newArrayList(deployments);
      LOGGER.info("Knot.x STARTED {}", buildMessage());
      if (completion != null) {
       completion.complete();
      }
     },
     error -> {
      LOGGER.error("Verticle could not be deployed", error);
      if (completion != null) {
       completion.fail(error);
      }
     }
   );
}

代码示例来源:origin: cescoffier/vertx-kubernetes-workshop

/**
 * This method is called when the verticle is deployed.
 */
@Override
public void start(Future<Void> future) {
  discovery = ServiceDiscovery.create(vertx);
  ConfigRetriever retriever = ConfigRetriever.create(vertx, getConfigurationOptions());
  retriever.rxGetConfig()
    // Read the configuration, and deploy a MarketDataVerticle for each company listed in the configuration.
    .flatMap(config ->
      Observable.fromIterable(config.getJsonArray("companies"))
        .cast(JsonObject.class)
        // Deploy the verticle with a configuration.
        .flatMapSingle(company -> vertx.rxDeployVerticle(MarketDataVerticle.class.getName(),
          new DeploymentOptions().setConfig(company)))
        .toList()
    )
    // Deploy another verticle
    .flatMap(l -> vertx.rxDeployVerticle(RestQuoteAPIVerticle.class.getName()))
    // Expose the market-data message source
    .flatMap(x -> discovery.rxPublish(MessageSource.createRecord("market-data", ADDRESS)))
    .subscribe((rec, err) -> {
      if (rec != null) {
        this.record = rec;
        future.complete();
      } else {
        future.fail(err);
      }
    });
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test() throws Exception {
  Observable<String> first = Observable.fromCallable(() -> "HEY").delay(250, TimeUnit.MILLISECONDS);
  Observable<Integer> second = Observable.fromCallable(() -> 1).delay(350, TimeUnit.MILLISECONDS);
  List<Observable<?>> observables = com.google.common.collect.Lists.newArrayList(first, second);
  Map<Long, Object> someWeirdMapWithObject = com.google.common.collect.ImmutableMap.of(
      1L, new BrandBuilder(1),
      2L, new BrandBuilder(2)
  );
  Observable
      .fromIterable(observables)
      .flatMap(task -> task.observeOn(Schedulers.computation()))
      // wait for all tasks to finish
      .lastOrError()
      .flattenAsObservable(x -> someWeirdMapWithObject.values())
      .<BrandBuilder>cast(BrandBuilder.class)
      .map(BrandBuilder::build)
      .toList().blockingGet();
}

相关文章

Observable类方法