本文整理了Java中io.reactivex.Observable.cast()
方法的一些代码示例,展示了Observable.cast()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.cast()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:cast
[英]Returns an Observable that emits the items emitted by the source ObservableSource, converted to the specified type.
Scheduler: cast does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源ObservableSource发出的项,并转换为指定的类型。
调度程序:默认情况下,cast不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Object> apply(Observable<? extends Throwable> t1) {
return t1.map(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable t1) {
return 0;
}
}).startWith(0).cast(Object.class);
}
}).subscribe(observer);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void castNull() {
just1.cast(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCastWithWrongType() {
Observable<?> source = Observable.just(1, 2);
Observable<Boolean> observable = source.cast(Boolean.class);
Observer<Boolean> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onError(
any(ClassCastException.class));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCast() {
Observable<?> source = Observable.just(1, 2);
Observable<Integer> observable = source.cast(Integer.class);
Observer<Integer> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(1);
verify(observer, never()).onError(
any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Object> apply(Observable<? extends Throwable> attempts) {
// Worker w = Schedulers.computation().createWorker();
return attempts
.map(new Function<Throwable, Tuple>() {
@Override
public Tuple apply(Throwable n) {
return new Tuple(new Long(1), n);
}})
.scan(new BiFunction<Tuple, Tuple, Tuple>() {
@Override
public Tuple apply(Tuple t, Tuple n) {
return new Tuple(t.count + n.count, n.n);
}})
.flatMap(new Function<Tuple, Observable<Long>>() {
@Override
public Observable<Long> apply(Tuple t) {
System.out.println("Retry # " + t.count);
return t.count > 20 ?
Observable.<Long>error(t.n) :
Observable.timer(t.count * 1L, TimeUnit.MILLISECONDS);
}}).cast(Object.class);
}
}).subscribe(to);
代码示例来源:origin: ReactiveX/RxJava
/**
* Filters the items emitted by an ObservableSource, only emitting those of the specified type.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U> the output type
* @param clazz
* the class type to filter the items emitted by the source ObservableSource
* @return an Observable that emits items from the source ObservableSource of type {@code clazz}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> ofType(final Class<U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}
代码示例来源:origin: redisson/redisson
/**
* Filters the items emitted by an ObservableSource, only emitting those of the specified type.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U> the output type
* @param clazz
* the class type to filter the items emitted by the source ObservableSource
* @return an Observable that emits items from the source ObservableSource of type {@code clazz}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> ofType(final Class<U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}
代码示例来源:origin: leftcoding/GankLy
/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
*
* @param code 事件code
* @param eventType 事件类型
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
return bus.ofType(Message.class)
.filter(message -> message.getCode() == code && eventType.isInstance(message.getObject()))
.map(message -> message.getObject())
.cast(eventType);
}
代码示例来源:origin: Cognifide/knotx
private void deployVerticles(JsonObject config, Future<Void> completion) {
LOGGER.info("STARTING Knot.x {} @ {}", Version.getVersion(), Version.getBuildTime());
Observable.fromIterable(config.getJsonArray(MODULES_ARRAY))
.cast(String.class)
.map(ModuleDescriptor::parse)
.flatMap(item -> deployVerticle(config, item))
.reduce(new ArrayList<ModuleDescriptor>(), (accumulator, item) -> {
accumulator.add(item);
return accumulator;
})
.subscribe(
deployments -> {
deployedModules = Lists.newArrayList(deployments);
LOGGER.info("Knot.x STARTED {}", buildMessage());
if (completion != null) {
completion.complete();
}
},
error -> {
LOGGER.error("Verticle could not be deployed", error);
if (completion != null) {
completion.fail(error);
}
}
);
}
代码示例来源:origin: io.knotx/knotx-core
private void deployVerticles(JsonObject config, Future<Void> completion) {
LOGGER.info("STARTING Knot.x {} @ {}", Version.getVersion(), Version.getBuildTime());
Observable.fromIterable(config.getJsonArray(MODULES_ARRAY))
.cast(String.class)
.map(ModuleDescriptor::parse)
.flatMap(item -> deployVerticle(config, item))
.reduce(new ArrayList<ModuleDescriptor>(), (accumulator, item) -> {
accumulator.add(item);
return accumulator;
})
.subscribe(
deployments -> {
deployedModules = Lists.newArrayList(deployments);
LOGGER.info("Knot.x STARTED {}", buildMessage());
if (completion != null) {
completion.complete();
}
},
error -> {
LOGGER.error("Verticle could not be deployed", error);
if (completion != null) {
completion.fail(error);
}
}
);
}
代码示例来源:origin: cescoffier/vertx-kubernetes-workshop
/**
* This method is called when the verticle is deployed.
*/
@Override
public void start(Future<Void> future) {
discovery = ServiceDiscovery.create(vertx);
ConfigRetriever retriever = ConfigRetriever.create(vertx, getConfigurationOptions());
retriever.rxGetConfig()
// Read the configuration, and deploy a MarketDataVerticle for each company listed in the configuration.
.flatMap(config ->
Observable.fromIterable(config.getJsonArray("companies"))
.cast(JsonObject.class)
// Deploy the verticle with a configuration.
.flatMapSingle(company -> vertx.rxDeployVerticle(MarketDataVerticle.class.getName(),
new DeploymentOptions().setConfig(company)))
.toList()
)
// Deploy another verticle
.flatMap(l -> vertx.rxDeployVerticle(RestQuoteAPIVerticle.class.getName()))
// Expose the market-data message source
.flatMap(x -> discovery.rxPublish(MessageSource.createRecord("market-data", ADDRESS)))
.subscribe((rec, err) -> {
if (rec != null) {
this.record = rec;
future.complete();
} else {
future.fail(err);
}
});
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() throws Exception {
Observable<String> first = Observable.fromCallable(() -> "HEY").delay(250, TimeUnit.MILLISECONDS);
Observable<Integer> second = Observable.fromCallable(() -> 1).delay(350, TimeUnit.MILLISECONDS);
List<Observable<?>> observables = com.google.common.collect.Lists.newArrayList(first, second);
Map<Long, Object> someWeirdMapWithObject = com.google.common.collect.ImmutableMap.of(
1L, new BrandBuilder(1),
2L, new BrandBuilder(2)
);
Observable
.fromIterable(observables)
.flatMap(task -> task.observeOn(Schedulers.computation()))
// wait for all tasks to finish
.lastOrError()
.flattenAsObservable(x -> someWeirdMapWithObject.values())
.<BrandBuilder>cast(BrandBuilder.class)
.map(BrandBuilder::build)
.toList().blockingGet();
}
内容来源于网络,如有侵权,请联系作者删除!