rx.Observable.window()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(240)

本文整理了Java中rx.Observable.window()方法的一些代码示例,展示了Observable.window()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.window()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:window

Observable.window介绍

[英]Returns an Observable that emits windows of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping windows, each containing count items. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.

Backpressure Support: This operator does not support backpressure as it uses count to control data flow. Scheduler: This version of window does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出它从源Observable收集的项目窗口。由此产生的可见光发射连接的、不重叠的窗口,每个窗口都包含计数项。当源可观察对象完成或遇到错误时,生成的可观察对象将发出当前窗口,并从源可观察对象传播通知。
背压支持:此运算符不支持背压,因为它使用计数来控制数据流。调度程序:默认情况下,此版本的Windows不会在特定的调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Bucket> call() {
    return inputEventStream
        .observe()
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  }
});

代码示例来源:origin: PipelineAI/pipeline

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                    final Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
      return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
  };
  this.sourceStream = bucketedStream      //stream broken up into buckets
      .window(numBuckets, 1)          //emit overlapping windows of buckets
      .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: jhusain/learnrxjava

private static void flatMapWindowedExampleAsync() {
  Observable.range(0, 5000).window(500).flatMap(work -> {
    return work.observeOn(Schedulers.computation()).map(item -> {
      // simulate computational work
        try {
          Thread.sleep(1);
        } catch (Exception e) {
        }
        return item + " processed " + Thread.currentThread();
      });
  }).toBlocking().forEach(System.out::println);
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String args[]) {
  // buffer every 500ms (using 999999999 to mark start of output)
  hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  // buffer 10 items at a time (using 999999999 to mark start of output)
  hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  System.out.println("Done");
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Returns an Observable that emits windows of items it collects from the source Observable. The resulting
 * Observable emits connected, non-overlapping windows, each containing {@code count} items. When the source
 * Observable completes or encounters an error, the resulting Observable emits the current window and
 * propagates the notification from the source Observable.
 * <p>
 * <img width="640" height="400" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window3.png" alt="">
 * <dl>
 *  <dt><b>Backpressure Support:</b></dt>
 *  <dd>This operator does not support backpressure as it uses {@code count} to control data flow.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param count
 *            the maximum size of each window before it should be emitted
 * @return an Observable that emits connected, non-overlapping windows, each containing at most
 *         {@code count} items from the source Observable
 * @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava wiki: window</a>
 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window.aspx">MSDN: Observable.Window</a>
 */
public final Observable<Observable<T>> window(int count) {
  return window(count, count);
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
  public Observable<BatchStatement> call(Observable<BoundStatement> statements) {
    return statements
        .window(batchSize)
        .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  }
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
  public Observable<BatchStatement> call(Observable<Statement> statements) {
    return statements
        .window(batchSize)
        .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  }
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
  public Observable<BatchStatement> call(Observable<BoundStatement> statements) {
    return statements
        .window(batchSize)
        .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  }
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
  public Observable<BatchStatement> call(Observable<Statement> statements) {
    return statements
        .window(batchSize)
        .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
  }
}

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.range(1, 10).window(3).subscribe(new Action1<Observable<Integer>>() {
   @Override
   public void call(final Observable<Integer> integerObservable) {
    integerObservable.subscribe(new Action1<Integer>() {
     @Override
     public void call(Integer integer) {
      log(integer + " of window " + integerObservable);
     }
    });
   }
  });
 }
});

代码示例来源:origin: com.netflix.hystrix/hystrix-core

@Override
  public Observable<Bucket> call() {
    return inputEventStream
        .observe()
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  }
});

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_182() throws Exception {
  Observable
      .range(0, Integer.MAX_VALUE)
      .map(Picture::new)
      .window(1, TimeUnit.SECONDS)
      .flatMap(Observable::count)
      .subscribe(System.out::println);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_192() throws Exception {
  Observable
      .range(0, Integer.MAX_VALUE)
      .map(Picture::new)
      .window(10, TimeUnit.SECONDS)
      .flatMap(Observable::distinct);
}

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

@Override
  public Observable<FeedResponse<T>> call(Observable<T> source) {
    return source
        // .windows: creates an observable of observable where inner observable
        // emits max maxPageSize elements 
        .window(maxPageSize)
        .map(o -> o.toList())
        // flattens the observable<Observable<List<T>>> to Observable<List<T>>
        .flatMap(resultListObs -> resultListObs, 1)
        // translates Observable<List<T>> to Observable<FeedResponsePage<T>>>
        .map(resultList -> { 
          // construct a page from result of 
          return BridgeInternal.createFeedResponse(resultList, headerResponse(tracker.getAndResetCharge()));
        }).switchIfEmpty(
            Observable.defer(() -> {
              // create an empty page if there is no result
              return Observable.just(BridgeInternal.createFeedResponse(
                  Utils.immutableListOf(),
                  headerResponse(tracker.getAndResetCharge())));
            }));
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_216() throws Exception {
  Observable<KeyEvent> keyEvents = empty();
  Observable<Observable<KeyEvent>> windows = keyEvents.window(1, SECONDS);
  Observable<Integer> eventPerSecond = windows
      .flatMap(eventsInSecond -> eventsInSecond.count());
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_82() throws Exception {
  HystrixCommandCompletionStream
      .getInstance(HystrixCommandKey.Factory.asKey("FetchRating"))
      .observe()
      .filter(e -> e.getEventCounts().getCount(FAILURE) > 0)
      .window(1, TimeUnit.SECONDS)
      .flatMap(Observable::count)
      .subscribe(x -> log.info("{} failures/s", x));
}

相关文章

Observable类方法