本文整理了Java中rx.subjects.Subject
类的一些代码示例,展示了Subject
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject
类的具体详情如下:
包路径:rx.subjects.Subject
类名称:Subject
[英]Represents an object that is both an Observable and an Observer.
[中]表示既是可观察对象又是观察者的对象。
代码示例来源:origin: PipelineAI/pipeline
public void write(HystrixCommandExecutionStarted event) {
writeOnlySubject.onNext(event);
}
代码示例来源:origin: PipelineAI/pipeline
public void shutdown() {
writeOnlyCommandStartSubject.onCompleted();
writeOnlyCommandCompletionSubject.onCompleted();
writeOnlyCollapserSubject.onCompleted();
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
/**
* 接收消息
*/
public <T> Observable<T> toObserverable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test2() throws Exception {
Subject<Long, Long> subject = PublishSubject.create();
Observable<Long> initialObservable = subject.share()
.map(value -> {
System.out.println("Received value " + value);
new Exception().printStackTrace(System.out);
return value;
});
Observable<Long> timeoutObservable = initialObservable.map(value -> {
System.out.println("Timeout received value " + value);
return value;
});
TestSubscriber<Long> subscriber = new TestSubscriber<>();
initialObservable
.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.timeout(1, TimeUnit.SECONDS, timeoutObservable).subscribe(subscriber);
subject.onNext(5L);
Thread.sleep(1500);
subject.onNext(10L);
subject.onCompleted();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValues(5L, 10L);
}
代码示例来源:origin: com.couchbase.client/core-io
@Override
public void onNext(T t) {
state.bufferedSubject.onNext(t);
// Schedule timeout once and when not subscribed yet.
if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
state.setTimeoutSubscription(timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
@Override
public void call(Long aLong) {
disposeIfNotSubscribed();
}
}));
}
}
代码示例来源:origin: laotan7237/EasyReader
/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* 对于注册了code为0,class为voidMessage的观察者,那么就接收不到code为0之外的voidMessage。
* @param code 事件code
* @param eventType 事件类型
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
return _bus.ofType(RxBusBaseMessage.class)
.filter(new Func1<RxBusBaseMessage,Boolean>() {
@Override
public Boolean call(RxBusBaseMessage o) {
//过滤code和eventType都相同的事件
return o.getCode() == code && eventType.isInstance(o.getObject());
}
}).map(new Func1<RxBusBaseMessage,Object>() {
@Override
public Object call(RxBusBaseMessage o) {
return o.getObject();
}
}).cast(eventType);
}
/**
代码示例来源:origin: com.netflix.eureka/eureka2-client
@Override
public Observable<Boolean> register(InstanceInfo instanceInfo, Source source) {
relay.onNext(new ChangeNotification<>(ChangeNotification.Kind.Add ,instanceInfo));
return Observable.just(true);
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static <T, R> Observable.Transformer<T, R> switchFlatMap(
int n, Func1<T, Observable<R>> mapper) {
return f ->
Observable.defer(() -> {
final AtomicInteger ingress = new AtomicInteger();
final Subject<Integer, Integer> cancel =
PublishSubject.<Integer>create().toSerialized();
return f.flatMap(v -> {
int id = ingress.getAndIncrement();
Observable<R> o = mapper.call(v)
.takeUntil(cancel.filter(e -> e == id + n));
cancel.onNext(id);
return o;
});
})
;
}
}
代码示例来源:origin: com.netflix.eureka/eureka2-client
@Override
public Observable<Void> shutdown() {
remoteBatchingRegistry.shutdown();
relay.onCompleted();
return Observable.empty();
}
代码示例来源:origin: com.netflix.eureka/eureka2-client
@Override
public Observable<Void> shutdown(Throwable cause) {
remoteBatchingRegistry.shutdown();
relay.onError(cause);
return Observable.empty();
}
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Test
public void sj2ToSj1Normal() {
io.reactivex.subjects.PublishSubject<Integer> ps2 = io.reactivex.subjects.PublishSubject.create();
rx.subjects.Subject<Integer, Integer> sj1 = toV1Subject(ps2);
rx.observers.AssertableSubscriber<Integer> to = sj1.test();
assertTrue(sj1.hasObservers());
assertTrue(ps2.hasObservers());
sj1.onNext(1);
sj1.onNext(2);
sj1.onCompleted();
assertFalse(sj1.hasObservers());
assertFalse(ps2.hasObservers());
to.assertResult(1, 2);
}
代码示例来源:origin: com.couchbase.client/core-io
@Override
public void call() {
try {
obs.onNext(response);
obs.onCompleted();
} catch(Exception ex) {
obs.onError(ex);
} finally {
worker.unsubscribe();
}
}
});
代码示例来源:origin: akarnokd/RxJava2Interop
@Test
public void sj2ToSj1Lifecycle() {
io.reactivex.subjects.PublishSubject<Integer> pp2 = io.reactivex.subjects.PublishSubject.create();
rx.subjects.Subject<Integer, Integer> sj1 = toV1Subject(pp2);
rx.observers.AssertableSubscriber<Integer> to = sj1.test(0L);
assertTrue(sj1.hasObservers());
assertTrue(pp2.hasObservers());
sj1.onNext(1);
sj1.onError(new IOException());
assertFalse(sj1.hasObservers());
assertFalse(pp2.hasObservers());
assertFalse(pp2.hasComplete());
assertTrue(pp2.hasThrowable());
assertNotNull(pp2.getThrowable());
to.assertFailure(rx.exceptions.MissingBackpressureException.class);
}
代码示例来源:origin: bravekingzhang/CleanArch
/**
* 消息发送者调用
* @param o
*/
public void send(Object o) {
if (_bus.hasObservers()){
_bus.onNext(o);
}
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Test
public void sj2ToSj1Backpressured() {
io.reactivex.subjects.PublishSubject<Integer> pp2 = io.reactivex.subjects.PublishSubject.create();
rx.subjects.Subject<Integer, Integer> sj1 = toV1Subject(pp2);
rx.observers.AssertableSubscriber<Integer> to = sj1.test(0L);
assertTrue(sj1.hasObservers());
assertTrue(pp2.hasObservers());
sj1.onNext(1);
assertFalse(sj1.hasObservers());
assertFalse(pp2.hasObservers());
assertFalse(pp2.hasComplete());
assertFalse(pp2.hasThrowable());
assertNull(pp2.getThrowable());
to.assertFailure(rx.exceptions.MissingBackpressureException.class);
}
代码示例来源:origin: com.couchbase.client/core-io
public void add(final StatResponse response) {
if (response.key() == null) {
// Skip NULL-terminator for successful response
if (!response.status().isSuccess()) {
observable().onNext(response);
}
observable().onCompleted();
} else {
observable().onNext(response);
}
}
代码示例来源:origin: yahoo/fili
/**
* Handle publishing the length and an error to the Subject.
*
* @param stream Stream to get the length from
* @param t Error that was encountered (to be published)
*/
private void emitError(LengthOfOutputStream stream, Throwable t) {
Subject<Long, Long> lengthBroadcaster = stream.getLengthBroadcaster();
lengthBroadcaster.onNext(stream.getResponseLength());
lengthBroadcaster.onError(t);
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Override
public void onError(Throwable e) {
if (!terminated) {
if (e == null) {
e = new NullPointerException("Throwable was null");
}
error = e;
terminated = true;
source.onError(e);
} else {
io.reactivex.plugins.RxJavaPlugins.onError(e);
}
}
代码示例来源:origin: com.netflix.eureka/eureka2-core
private void terminateLifecycle(Throwable e) {
if (e == null) {
lifecycleSubject.onCompleted();
} else {
lifecycleSubject.onError(e);
}
}
代码示例来源:origin: cn-ljb/rxjava_for_android
/**是否含有观察者*/
public boolean hasObservers() {
return _bus.hasObservers();
}
}
内容来源于网络,如有侵权,请联系作者删除!