io.reactivex.Observable.groupBy()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中io.reactivex.Observable.groupBy()方法的一些代码示例,展示了Observable.groupBy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.groupBy()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:groupBy

Observable.groupBy介绍

[英]Groups the items emitted by an ObservableSource according to a specified criterion, and emits these grouped items as GroupedObservables. The emitted GroupedObservableSource allows only a single Observer during its lifetime and if this Observer calls dispose() before the source terminates, the next emission by the source having the same key will trigger a new GroupedObservableSource emission.

Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservableSources that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like #ignoreElements to them. Scheduler: groupBy does not operate by default on a particular Scheduler.
[中]根据指定的条件对可观测资源发出的项进行分组,并将这些分组的项作为GroupedObservable发出。发出的GroupedObservableSource在其生存期内仅允许一个观察者,如果此观察者在源终止之前调用dispose(),则具有相同密钥的源的下一次发射将触发新的GroupedObservableSource发射。
*注:*GroupedObservable将缓存它要发送的项目,直到订阅为止。因此,为了避免内存泄漏,您不应该简单地忽略那些与您无关的GroupedObservable源。相反,您可以向他们发出信号,他们可以通过对其应用类似于#ignoreElements的运算符来丢弃缓冲区。调度器:默认情况下,groupBy不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

public void groupByKeyNull() {
  just1.groupBy(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void groupByValueNull() {
  just1.groupBy(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return v;
    }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void groupByNull() {
  just1.groupBy(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByWithElementSelector() {
  Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
  Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
  Map<Integer, Collection<Integer>> map = toMap(grouped);
  assertEquals(3, map.size());
  assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
  assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
  assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByWithElementSelector2() {
  Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
  Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
  Map<Integer, Collection<Integer>> map = toMap(grouped);
  assertEquals(3, map.size());
  assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
  assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
  assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByWithNullKey() {
  final String[] key = new String[]{"uninitialized"};
  final List<String> values = new ArrayList<String>();
  Observable.just("a", "b", "c").groupBy(new Function<String, String>() {
    @Override
    public String apply(String value) {
      return null;
    }
  }).subscribe(new Consumer<GroupedObservable<String, String>>() {
    @Override
    public void accept(GroupedObservable<String, String> groupedObservable) {
      key[0] = groupedObservable.getKey();
      groupedObservable.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
          values.add(s);
        }
      });
    }
  });
  assertEquals(null, key[0]);
  assertEquals(Arrays.asList("a", "b", "c"), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupBy() {
  Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
  Observable<GroupedObservable<Integer, String>> grouped = source.groupBy(length);
  Map<Integer, Collection<String>> map = toMap(grouped);
  assertEquals(3, map.size());
  assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), map.get(3).toArray());
  assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
  assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void groupByValueReturnsNull() {
  just1.groupBy(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return v;
    }
  }, new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).groupBy(Functions.justFunction(1)));
  Observable.just(1)
  .groupBy(Functions.justFunction(1))
  .doOnNext(new Consumer<GroupedObservable<Integer, Integer>>() {
    @Override
    public void accept(GroupedObservable<Integer, Integer> g) throws Exception {
      TestHelper.checkDisposed(g);
    }
  })
  .test();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
  // choose an asynchronous source
  Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1);
  // apply groupBy to the source
  Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
  // create two observers
  Observer<GroupedObservable<Boolean, Long>> o1 = TestHelper.mockObserver();
  Observer<GroupedObservable<Boolean, Long>> o2 = TestHelper.mockObserver();
  // subscribe with the observers
  stream.subscribe(o1);
  stream.subscribe(o2);
  // check that subscriptions were successful
  verify(o1, never()).onError(Mockito.<Throwable> any());
  verify(o2, never()).onError(Mockito.<Throwable> any());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Assert we get an IllegalStateException if trying to subscribe to an inner GroupedObservable more than once.
 */
@Test
public void testExceptionIfSubscribeToChildMoreThanOnce() {
  Observable<Integer> source = Observable.just(0);
  final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
  Observable<GroupedObservable<Integer, Integer>> m = source.groupBy(identity, dbl);
  m.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
    @Override
    public void accept(GroupedObservable<Integer, Integer> t1) {
      inner.set(t1);
    }
  });
  inner.get().subscribe();
  Observer<Integer> o2 = TestHelper.mockObserver();
  inner.get().subscribe(o2);
  verify(o2, never()).onComplete();
  verify(o2, never()).onNext(anyInt());
  verify(o2).onError(any(IllegalStateException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByUnsubscribe() {
  final Disposable upstream = mock(Disposable.class);
  Observable<Integer> o = Observable.unsafeCreate(
      new ObservableSource<Integer>() {
        @Override
        public void subscribe(Observer<? super Integer> observer) {
          observer.onSubscribe(upstream);
        }
      }
  );
  TestObserver<Object> to = new TestObserver<Object>();
  o.groupBy(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer integer) {
      return null;
    }
  }).subscribe(to);
  to.dispose();
  verify(upstream).dispose();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testEmpty() {
  Observable<String> source = Observable.empty();
  Observable<GroupedObservable<Integer, String>> grouped = source.groupBy(length);
  Map<Integer, Collection<String>> map = toMap(grouped);
  assertTrue(map.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerEscapeCompleted() {
  Observable<Integer> source = Observable.just(0);
  Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Object> to = new TestObserver<Object>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  System.out.println(to.values());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void delayErrorSimpleComplete() {
    Observable.just(1)
    .groupBy(Functions.justFunction(1), true)
    .flatMap(Functions.<Observable<Integer>>identity())
    .test()
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void keySelectorThrows() {
  Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
  Observable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Integer> to = new TestObserver<Integer>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void valueSelectorThrows() {
  Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
  Observable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
  TestObserver<Integer> to = new TestObserver<Integer>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertNoValues();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testError2() {
  Observable<Integer> source = Observable.concat(Observable.just(0),
      Observable.<Integer> error(new TestException("Forced failure")));
  Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Object> to = new TestObserver<Object>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertValueCount(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void keySelectorAndDelayError() {
  Observable.just(1).concatWith(Observable.<Integer>error(new TestException()))
  .groupBy(Functions.<Integer>identity(), true)
  .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(GroupedObservable<Integer, Integer> g) throws Exception {
      return g;
    }
  })
  .test()
  .assertFailure(TestException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void keyAndValueSelectorAndDelayError() {
  Observable.just(1).concatWith(Observable.<Integer>error(new TestException()))
  .groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), true)
  .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(GroupedObservable<Integer, Integer> g) throws Exception {
      return g;
    }
  })
  .test()
  .assertFailure(TestException.class, 1);
}

相关文章

Observable类方法