rx.Observable.subscribe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(223)

本文整理了Java中rx.Observable.subscribe()方法的一些代码示例,展示了Observable.subscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribe()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:subscribe

Observable.subscribe介绍

[英]Subscribes to an Observable but ignore its emissions and notifications. Scheduler: subscribe does not operate by default on a particular Scheduler.
[中]订阅可观察到的,但忽略其发射和通知。调度程序:订阅默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: greenrobot/greenDAO

private void updateNotes() {
  notesQuery.list()
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Action1<List<Note>>() {
        @Override
        public void call(List<Note> notes) {
          notesAdapter.setNotes(notes);
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

public void startCachingStreamValuesIfUnstarted() {
  if (rollingMaxSubscription.get() == null) {
    //the stream is not yet started
    Subscription candidateSubscription = observe().subscribe(rollingMax);
    if (rollingMaxSubscription.compareAndSet(null, candidateSubscription)) {
      //won the race to set the subscription
    } else {
      //lost the race to set the subscription, so we need to cancel this one
      candidateSubscription.unsubscribe();
    }
  }
}

代码示例来源:origin: PipelineAI/pipeline

public void startCachingStreamValuesIfUnstarted() {
  if (rollingDistributionSubscription.get() == null) {
    //the stream is not yet started
    Subscription candidateSubscription = observe().subscribe(rollingDistribution);
    if (rollingDistributionSubscription.compareAndSet(null, candidateSubscription)) {
      //won the race to set the subscription
    } else {
      //lost the race to set the subscription, so we need to cancel this one
      candidateSubscription.unsubscribe();
    }
  }
}

代码示例来源:origin: PipelineAI/pipeline

public void startCachingStreamValuesIfUnstarted() {
  if (subscription.get() == null) {
    //the stream is not yet started
    Subscription candidateSubscription = observe().subscribe(counterSubject);
    if (subscription.compareAndSet(null, candidateSubscription)) {
      //won the race to set the subscription
    } else {
      //lost the race to set the subscription, so we need to cancel this one
      candidateSubscription.unsubscribe();
    }
  }
}

代码示例来源:origin: greenrobot/greenDAO

@Override
  public void onNoteClick(int position) {
    Note note = notesAdapter.getNote(position);
    final Long noteId = note.getId();
    noteDao.deleteByKey(noteId)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Void>() {
          @Override
          public void call(Void aVoid) {
            Log.d("DaoExample", "Deleted note, ID: " + noteId);
            updateNotes();
          }
        });
  }
};

代码示例来源:origin: PipelineAI/pipeline

@Test
public void noEvents() throws InterruptedException {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  //no writes
  assertFalse(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void noEvents() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
  commandStream.observe().take(1).subscribe(subscriber);
  //no writes
  assertFalse(latch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedSuccess() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedTimeout() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: greenrobot/greenDAO

static <T> TestSubscriber<T> awaitTestSubscriber(Observable<T> observable) {
  TestSubscriber<T> testSubscriber = new TestSubscriber<>();
  observable.subscribe(testSubscriber);
  testSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
  testSubscriber.assertNoErrors();
  testSubscriber.assertCompleted();
  return testSubscriber;
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedSuccess() throws InterruptedException {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testObservableRaiseHystrixRuntimeException() {
  TestSubscriber<Void> testSubscriber = new TestSubscriber<Void>();
  service.observableCommandShouldRaiseHystrixRuntimeException().subscribe(testSubscriber);
  testSubscriber.assertError(HystrixRuntimeException.class);
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testPropagateCauseException() throws NotFoundException {
  TestSubscriber<Void> testSubscriber = new TestSubscriber<Void>();
  userService.deleteUser("").subscribe(testSubscriber);
  testSubscriber.assertError(NotFoundException.class);
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testDoNotInterruptObserveOnTimeoutIfPropertySaysNotTo() throws InterruptedException {
  // given
  InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), false);
  // when
  cmd.observe().subscribe();
  // then
  Thread.sleep(500);
  assertFalse(cmd.hasBeenInterrupted());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testInterruptToObservableOnTimeout() throws InterruptedException {
  // given
  InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);
  // when
  cmd.toObservable().subscribe();
  // then
  Thread.sleep(500);
  assertTrue(cmd.hasBeenInterrupted());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSingleWriteSingleSubscriber() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
  commandStream.observe().take(1).subscribe(subscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
  commandStream.write(event);
  assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testInterruptObserveOnTimeout() throws InterruptedException {
  // given
  InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);
  // when
  cmd.observe().subscribe();
  // then
  Thread.sleep(500);
  assertTrue(cmd.hasBeenInterrupted());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testInterruptObserveOnTimeout() throws InterruptedException {
  // given
  InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);
  // when
  cmd.observe().subscribe();
  // then
  Thread.sleep(500);
  assertTrue(cmd.hasBeenInterrupted());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testDoNotInterruptToObservableOnTimeoutIfPropertySaysNotTo() throws InterruptedException {
  // given
  InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), false);
  // when
  cmd.toObservable().subscribe();
  // then
  Thread.sleep(500);
  assertFalse(cmd.hasBeenInterrupted());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testInterruptToObservableOnTimeout() throws InterruptedException {
  // given
  InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);
  // when
  cmd.toObservable().subscribe();
  // then
  Thread.sleep(500);
  assertTrue(cmd.hasBeenInterrupted());
}

相关文章

Observable类方法