io.reactivex.Observable.flatMapIterable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中io.reactivex.Observable.flatMapIterable()方法的一些代码示例,展示了Observable.flatMapIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapIterable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMapIterable

Observable.flatMapIterable介绍

[英]Returns an Observable that merges each item emitted by the source ObservableSource with the values in an Iterable corresponding to that item that is generated by a selector.

Scheduler: flatMapIterable does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Integer> o) throws Exception {
    return o.flatMapIterable(new Function<Object, Iterable<Integer>>() {
      @Override
      public Iterable<Integer> apply(Object v) throws Exception {
        return Arrays.asList(10, 20);
      }
    });
  }
}, false, 1, 1, 10, 20);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapIterableCombinerNull() {
  just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) {
      return Arrays.asList(1);
    }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapIterableMapperNull() {
  just1.flatMapIterable(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapIterableMapperIterableOneNull() {
  just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) {
      return Arrays.asList(1, null);
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapIterableMapperReturnsNull() {
  just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapIterableCombinerReturnsNull() {
  just1.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) {
      return Arrays.asList(1);
    }
  }, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void flatMapIterableMapperIteratorNull() {
  just1.flatMapIterable(new Function<Integer, Iterable<Object>>() {
    @Override
    public Iterable<Object> apply(Integer v) {
      return new Iterable<Object>() {
        @Override
        public Iterator<Object> iterator() {
          return null;
        }
      };
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testResultFunctionThrows() {
  Observer<Object> o = TestHelper.mockObserver();
  final List<Integer> list = Arrays.asList(1, 2, 3);
  Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
    @Override
    public List<Integer> apply(Integer t1) {
      return list;
    }
  };
  BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      throw new TestException();
    }
  };
  List<Integer> source = Arrays.asList(16, 32, 64);
  Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
  verify(o, never()).onComplete();
  verify(o, never()).onNext(any());
  verify(o).onError(any(TestException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCollectionFunctionThrows() {
  Observer<Object> o = TestHelper.mockObserver();
  Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
    @Override
    public List<Integer> apply(Integer t1) {
      throw new TestException();
    }
  };
  BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 | t2;
    }
  };
  List<Integer> source = Arrays.asList(16, 32, 64);
  Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
  verify(o, never()).onComplete();
  verify(o, never()).onNext(any());
  verify(o).onError(any(TestException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void iterableMapperFunctionReturnsNull() {
  Observable.just(1)
  .flatMapIterable(new Function<Integer, Iterable<Object>>() {
    @Override
    public Iterable<Object> apply(Integer v) throws Exception {
      return null;
    }
  }, new BiFunction<Integer, Object, Object>() {
    @Override
    public Object apply(Integer v, Object w) throws Exception {
      return v;
    }
  })
  .test()
  .assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable");
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that emits the events emitted by source ObservableSource, in a
 * sorted order based on a specified comparison function.
 *
 * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param sortFunction
 *            a function that compares two items emitted by the source ObservableSource and returns an Integer
 *            that indicates their sort order
 * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted(Comparator<? super T> sortFunction) {
  ObjectHelper.requireNonNull(sortFunction, "sortFunction is null");
  return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.<List<T>>identity());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that emits the events emitted by source ObservableSource, in a
 * sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
 * other items in the sequence.
 * <p>
 * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sorted.png" alt="">
 * <p>
 * If any item emitted by this Observable does not implement {@link Comparable} with respect to
 * all other items emitted by this Observable, no items will be emitted and the
 * sequence is terminated with a {@link ClassCastException}.
 *
 * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted() {
  return toList().toObservable().map(Functions.listSorter(Functions.<T>naturalComparator())).flatMapIterable(Functions.<List<T>>identity());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNormal() {
  Observer<Object> o = TestHelper.mockObserver();
  final List<Integer> list = Arrays.asList(1, 2, 3);
  Function<Integer, List<Integer>> func = new Function<Integer, List<Integer>>() {
    @Override
    public List<Integer> apply(Integer t1) {
      return list;
    }
  };
  BiFunction<Integer, Integer, Integer> resFunc = new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 | t2;
    }
  };
  List<Integer> source = Arrays.asList(16, 32, 64);
  Observable.fromIterable(source).flatMapIterable(func, resFunc).subscribe(o);
  for (Integer s : source) {
    for (Integer v : list) {
      verify(o).onNext(s | v);
    }
  }
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: redisson/redisson

/**
 * Returns an Observable that emits the events emitted by source ObservableSource, in a
 * sorted order based on a specified comparison function.
 *
 * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param sortFunction
 *            a function that compares two items emitted by the source ObservableSource and returns an Integer
 *            that indicates their sort order
 * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted(Comparator<? super T> sortFunction) {
  ObjectHelper.requireNonNull(sortFunction, "sortFunction is null");
  return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.<List<T>>identity());
}

代码示例来源:origin: redisson/redisson

/**
 * Returns an Observable that emits the events emitted by source ObservableSource, in a
 * sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
 * other items in the sequence.
 * <p>
 * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sorted.png" alt="">
 * <p>
 * If any item emitted by this Observable does not implement {@link Comparable} with respect to
 * all other items emitted by this Observable, no items will be emitted and the
 * sequence is terminated with a {@link ClassCastException}.
 *
 * <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @return an Observable that emits the items emitted by the source ObservableSource in sorted order
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> sorted() {
  return toList().toObservable().map(Functions.listSorter(Functions.<T>naturalComparator())).flatMapIterable(Functions.<List<T>>identity());
}

代码示例来源:origin: ReactiveX/RxJava

.flatMapIterable(new Function<Integer, Iterable<Integer>>() {
  @Override
  public Iterable<Integer> apply(Integer v)

代码示例来源:origin: bitrich-info/xchange-stream

public Observable<PoloniexWebSocketEvent> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
  String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
  return subscribeChannel(channelName)
      .flatMapIterable(s -> {
        PoloniexWebSocketEventsTransaction transaction = objectMapper.treeToValue(s, PoloniexWebSocketEventsTransaction.class);
        return Arrays.asList(transaction.getEvents());
      }).share();
}

代码示例来源:origin: bitrich-info/xchange-stream

@Override
  public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
    Observable<GeminiTrade[]> subscribedTrades = service.subscribeChannel(currencyPair, args)
        .filter(s -> filterEventsByReason(s, "trade", null))
        .map((JsonNode s) -> {
          GeminiWebSocketTransaction transaction = mapper.treeToValue(s, GeminiWebSocketTransaction.class);
          return transaction.toGeminiTrades();
        });

    return subscribedTrades.flatMapIterable(s -> adaptTrades(s, currencyPair).getTrades());
  }
}

代码示例来源:origin: sczyh30/vertx-blueprint-todo-backend

@Override
public Maybe<Todo> getCertain(String todoID) {
 return client.rxQueryWithParams(SQL_QUERY, new JsonArray().add(todoID))
  .map(ResultSet::getRows)
  .toObservable()
  .flatMapIterable(e -> e)
  .singleElement()
  .map(Todo::new);
}

代码示例来源:origin: bitrich-info/xchange-stream

@Override
  public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
    String channelName = "lightning_executions_" + currencyPair.base.toString() + "_" + currencyPair.counter.toString();
    Observable<BitflyerTrade> tradeTransactions = streamingService.subscribeChannel(channelName).flatMapIterable(s -> {
      BitflyerPubNubTradesTransaction transaction = new BitflyerPubNubTradesTransaction(s);
      return transaction.toBitflyerTrades();
    });

    return tradeTransactions.map(s -> s.toTrade(currencyPair));
  }
}

相关文章

Observable类方法