io.reactivex.subjects.Subject.toFlowable()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(154)

本文整理了Java中io.reactivex.subjects.Subject.toFlowable()方法的一些代码示例,展示了Subject.toFlowable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.toFlowable()方法的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject
方法名:toFlowable

Subject.toFlowable介绍

暂无

代码示例

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
 final String service = Context.CONNECTIVITY_SERVICE;
 final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
 networkCallback = createNetworkCallback(context);
 registerIdleReceiver(context);
 final NetworkRequest request =
   new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
     .addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
     .build();
 manager.registerNetworkCallback(request, networkCallback);
 return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
  @Override public void run() {
   tryToUnregisterCallback(manager);
   tryToUnregisterReceiver(context);
  }
 }).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<WebSocket> connectingSockets() {
  return connectedSockets.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<Pair<WebSocket, CloseReason>> closingSockets() {
  return closedSockets.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: qyxxjd/BaseProject

@SuppressWarnings("WeakerAccess") public Flowable<Object> toFlowable(BackpressureStrategy strategy) {
  return mBus.toFlowable(strategy);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<Pair<WebSocket, String>> messages() {
  return messages.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<WebSocket> connectingSockets() {
  return connectedSockets.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<Pair<WebSocket, Throwable>> errors() {
  return errors.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<Pair<WebSocket, CloseReason>> closingSockets() {
  return closedSockets.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<Pair<WebSocket, String>> messages() {
  return messages.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: ch.squaredesk.nova/websockets

@Override
public Flowable<Pair<WebSocket, Throwable>> errors() {
  return errors.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: contentful/vault

public static Flowable<SyncResult> observeSyncResults() {
 return SYNC_SUBJECT.toFlowable(BackpressureStrategy.BUFFER);
}

代码示例来源:origin: cr330326/DemoComponent

/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 *
 * @param eventType 事件类型
 * @return return
 */
private <T> Flowable<T> toObservable(Class<T> eventType) {
  return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
}

代码示例来源:origin: simplezhli/RxPay

/**
 * 返回指定类型的带背压的Flowable实例
 *
 * @param <T>
 * @param type
 * @return
 */
public <T>Flowable<T> toObservable(Class<T> type){
  return mSubject.toFlowable(BackpressureStrategy.BUFFER)
      .ofType(type);
}
/**

代码示例来源:origin: LuckSiege/PictureSelectorLight

/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 *
 * @param eventType 事件类型
 * @return return
 */
public  <T> Flowable<T> toObservable(Class<T> eventType) {
  return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
}

代码示例来源:origin: info.magnolia.ui/magnolia-ui-framework

public Disposable observeNullable(Consumer<T> action) {
  return subject
      .toFlowable(BackpressureStrategy.LATEST)
      .map(optional -> optional)
      .subscribe(
          optional -> action.accept(optional.orElse(null)),
          e -> log.error("Failed to dispatch context property change: {}", e.getMessage(), e));
}

代码示例来源:origin: ch.squaredesk.nova/http

@Override
public <T> Flowable<RpcInvocation<T>> requests(String destination, Class<T> targetType) {
  URL destinationAsLocalUrl;
  try {
    destinationAsLocalUrl = new URL("http", "localhost", destination);
  } catch (MalformedURLException e) {
    throw new RuntimeException(e);
  }
  Flowable retVal = mapDestinationToIncomingMessages
      .computeIfAbsent(destination, key -> {
        logger.info("Listening to requests on " + destination);
        Subject<RpcInvocation> stream = PublishSubject.create();
        stream = stream.toSerialized();
        NonBlockingHttpHandler httpHandler = new NonBlockingHttpHandler(destinationAsLocalUrl, messageTranscriber, targetType, stream);
        httpServer.getServerConfiguration().addHttpHandler(httpHandler, destination);
        return stream.toFlowable(BackpressureStrategy.BUFFER)
            .doFinally(() -> {
              mapDestinationToIncomingMessages.remove(destination);
              httpServer.getServerConfiguration().removeHttpHandler(httpHandler);
              logger.info("Stopped listening to requests on " + destination);
            })
            .share();
      });
  return retVal;
}

代码示例来源:origin: LuckSiege/PictureSelectorLight

/**
 * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
 *
 * @param code      事件code
 * @param eventType 事件类型
 */
private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
  return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
      .filter(new Predicate<Message>() {
        @Override
        public boolean test(Message o) throws Exception {
          return o.getCode() == code && eventType.isInstance(o.getObject());
        }
      }).map(new Function<Message, Object>() {
        @Override
        public Object apply(Message o) throws Exception {
          return o.getObject();
        }
      }).cast(eventType);
}

代码示例来源:origin: cr330326/DemoComponent

/**
 * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
 *
 * @param code      事件code
 * @param eventType 事件类型
 */
private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
  return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
      .filter(new Predicate<Message>() {
        @Override
        public boolean test(Message o) throws Exception {
          return o.getCode() == code && eventType.isInstance(o.getObject());
        }
      }).map(new Function<Message, Object>() {
        @Override
        public Object apply(Message o) throws Exception {
          return o.getObject();
        }
      }).cast(eventType);
}

代码示例来源:origin: akarnokd/akarnokd-misc

.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single())
.map(discarded -> {

代码示例来源:origin: com.github.davidmoten/state-machine-runtime

@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
  return Flowable.defer(() -> {
    Worker worker = signalScheduler.createWorker();
    Flowable<Signal<?, Id>> o0 = subject //
        .toSerialized() //
        .toFlowable(BackpressureStrategy.BUFFER) //
        .mergeWith(signals) //
        .doOnCancel(() -> worker.dispose()) //
        .compose(preGroupBy);
    Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
    if (mapFactory != null) {
      o = o0.groupBy(signal -> new ClassId(signal.cls(),
       signal.id()), x -> x, true, 16, mapFactory);
    } else {
      o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
          Functions.identity());
    }
    return o.flatMap(g -> {
      Flowable<EntityStateMachine<?, Id>> obs = g //
          .flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
          .doOnNext(m -> stateMachines.put(g.getKey(), m)) //
          .subscribeOn(processingScheduler); //
      Flowable<EntityStateMachine<?, Id>> res = entityTransform
          .apply(grouped(g.getKey(), obs));
      return res;
    });
  });
}

相关文章