io.reactivex.subjects.Subject类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(172)

本文整理了Java中io.reactivex.subjects.Subject类的一些代码示例,展示了Subject类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject类的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject

Subject介绍

[英]Represents an Observer and an Observable at the same time, allowing multicasting events from a single source to multiple child Observers.

All methods except the #onSubscribe(io.reactivex.disposables.Disposable), #onNext(Object), #onError(Throwable) and #onComplete() are thread-safe. Use #toSerialized() to make these methods thread-safe as well.
[中]同时表示观察者和可观察者,允许将事件从单个源多播到多个子观察者。
除了#onSubscribe(io.reactivex.disposables.Disposable)、onNext(Object)、onError(Throwable)和#onComplete()之外的所有方法都是线程安全的。使用#toSerialized()也可以使这些方法具有线程安全性。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Emit the given values and complete the Subject.
 * @param <T> the value type
 * @param p the target subject
 * @param values the values to emit
 */
public static <T> void emit(Subject<T> p, T... values) {
  for (T v : values) {
    p.onNext(v);
  }
  p.onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public boolean hasObservers() {
  return actual.hasObservers();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    s.onError(ex);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testPublishSubjectValueEmpty() {
  PublishSubject<Integer> async = PublishSubject.create();
  async.onComplete();
  Subject<Integer> serial = async.toSerialized();
  assertFalse(serial.hasObservers());
  assertTrue(serial.hasComplete());
  assertFalse(serial.hasThrowable());
  assertNull(serial.getThrowable());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onComplete() {
  DisposableHelper.replace(upstream, null);
  active = false;
  signaller.onNext(0);
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void sj1ToSj2Lifecycle() {
  rx.subjects.PublishSubject<Integer> ps1 = rx.subjects.PublishSubject.create();
  io.reactivex.subjects.Subject<Integer> sj2 = toV2Subject(ps1);
  io.reactivex.observers.TestObserver<Integer> to = sj2.test();
  assertTrue(sj2.hasObservers());
  assertTrue(ps1.hasObservers());
  assertFalse(sj2.hasComplete());
  assertFalse(sj2.hasThrowable());
  assertNull(sj2.getThrowable());
  Disposable d1 = Disposables.empty();
  sj2.onSubscribe(d1);
  assertFalse(d1.isDisposed());
  sj2.onNext(1);
  sj2.onNext(2);
  sj2.onComplete();
  sj2.onComplete();
  sj2.onError(new IOException());
  sj2.onNext(3);
  Disposable d2 = Disposables.empty();
  sj2.onSubscribe(d2);
  assertFalse(d1.isDisposed());
  assertTrue(d2.isDisposed());
  assertFalse(sj2.hasObservers());
  assertFalse(ps1.hasObservers());
  assertTrue(sj2.hasComplete());
  assertFalse(sj2.hasThrowable());
  assertNull(sj2.getThrowable());
  to.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  Subject<Integer> s = PublishSubject.<Integer>create().toSerialized();
  TestObserver<Integer> to = s.test();
  Observable.range(1, 10).subscribe(s);
  to.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  assertFalse(s.hasObservers());
  s.onNext(11);
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    s.onError(new TestException());
    TestHelper.assertUndeliverable(errors, 0, TestException.class);
  } finally {
    RxJavaPlugins.reset();
  }
  s.onComplete();
  Disposable bs = Disposables.empty();
  s.onSubscribe(bs);
  assertTrue(bs.isDisposed());
}

代码示例来源:origin: io.apptik.rhub/roxy-rxjava2

@Override
public void onError(Throwable e) {
  if (tePolicy.equals(WRAP)) {
    subj.onNext(new Event.ErrorEvent(e));
  } else if (tePolicy.equals(PASS)) {
    subj.onError(e);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    s.onComplete();
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeWhileOnSubject1() {
  Subject<Integer> s = PublishSubject.create();
  Observable<Integer> take = s.takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer input) {
      return input < 3;
    }
  });
  Observer<Integer> observer = TestHelper.mockObserver();
  take.subscribe(observer);
  s.onNext(1);
  s.onNext(2);
  s.onNext(3);
  s.onNext(4);
  s.onNext(5);
  s.onComplete();
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, never()).onNext(3);
  verify(observer, never()).onNext(4);
  verify(observer, never()).onNext(5);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void countRestartsOnTimeTick() {
    TestScheduler scheduler = new TestScheduler();
    Subject<Integer> ps = PublishSubject.<Integer>create();

    TestObserver<Observable<Integer>> to = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
    .test();

    // window #1
    ps.onNext(1);
    ps.onNext(2);

    scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);

    // window #2
    ps.onNext(3);
    ps.onNext(4);
    ps.onNext(5);
    ps.onNext(6);

    to.assertValueCount(2)
    .assertNoErrors()
    .assertNotComplete();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onNextNull() {
  Subject<T> p = create();
  try {
    p.onNext(null);
    fail("No NullPointerException thrown");
  } catch (NullPointerException ex) {
    assertEquals("onNext called with null. Null values are generally not allowed in 2.x operators and sources.", ex.getMessage());
  }
  p.test().assertEmpty().cancel();
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void sj1ToSj2Error() {
  rx.subjects.PublishSubject<Integer> ps1 = rx.subjects.PublishSubject.create();
  io.reactivex.subjects.Subject<Integer> sj2 = toV2Subject(ps1);
  io.reactivex.observers.TestObserver<Integer> to = sj2.test();
  assertTrue(sj2.hasObservers());
  assertTrue(ps1.hasObservers());
  assertFalse(sj2.hasComplete());
  assertFalse(sj2.hasThrowable());
  assertNull(sj2.getThrowable());
  sj2.onError(new IOException());
  assertFalse(sj2.hasObservers());
  assertFalse(ps1.hasObservers());
  assertFalse(sj2.hasComplete());
  assertTrue(sj2.hasThrowable());
  assertNotNull(sj2.getThrowable());
  assertTrue(sj2.getThrowable() instanceof IOException);
  to.assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Observer<? super T> observer) {
  actual.subscribe(observer);
}

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
 final String service = Context.CONNECTIVITY_SERVICE;
 final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
 networkCallback = createNetworkCallback(context);
 registerIdleReceiver(context);
 final NetworkRequest request =
   new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
     .addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
     .build();
 manager.registerNetworkCallback(request, networkCallback);
 return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
  @Override public void run() {
   tryToUnregisterCallback(manager);
   tryToUnregisterReceiver(context);
  }
 }).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}

代码示例来源:origin: akarnokd/RxJava2Interop

@Test
public void sj1ToSj2NullException() {
  rx.subjects.PublishSubject<Integer> ps1 = rx.subjects.PublishSubject.create();
  io.reactivex.subjects.Subject<Integer> sj2 = toV2Subject(ps1);
  io.reactivex.observers.TestObserver<Integer> to = sj2.test();
  sj2.onError(null);
  assertFalse(sj2.hasObservers());
  assertFalse(ps1.hasObservers());
  to.assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void onErrorNull() {
    Subject<T> p = create();

    try {
      p.onError(null);
      fail("No NullPointerException thrown");
    } catch (NullPointerException ex) {
      assertEquals("onError called with null. Null values are generally not allowed in 2.x operators and sources.", ex.getMessage());
    }

    p.test().assertEmpty().cancel();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorThrownIssue1685() {
  Subject<Object> subject = ReplaySubject.create();
  Observable.error(new RuntimeException("oops"))
    .materialize()
    .delay(1, TimeUnit.SECONDS)
    .dematerialize(Functions.<Notification<Object>>identity())
    .subscribe(subject);
  subject.subscribe();
  subject.materialize().blockingFirst();
  System.out.println("Done");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testPublishSubjectValueRelay() {
  PublishSubject<Integer> async = PublishSubject.create();
  async.onNext(1);
  async.onComplete();
  Subject<Integer> serial = async.toSerialized();
  assertFalse(serial.hasObservers());
  assertTrue(serial.hasComplete());
  assertFalse(serial.hasThrowable());
  assertNull(serial.getThrowable());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void run() {
    s.onNext(1);
  }
};

相关文章