本文整理了Java中io.reactivex.subjects.Subject.toFlowable()
方法的一些代码示例,展示了Subject.toFlowable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.toFlowable()
方法的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject
方法名:toFlowable
暂无
代码示例来源:origin: pwittchen/ReactiveNetwork
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
final String service = Context.CONNECTIVITY_SERVICE;
final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
networkCallback = createNetworkCallback(context);
registerIdleReceiver(context);
final NetworkRequest request =
new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
.build();
manager.registerNetworkCallback(request, networkCallback);
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
@Override public void run() {
tryToUnregisterCallback(manager);
tryToUnregisterReceiver(context);
}
}).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<WebSocket> connectingSockets() {
return connectedSockets.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<Pair<WebSocket, CloseReason>> closingSockets() {
return closedSockets.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: qyxxjd/BaseProject
@SuppressWarnings("WeakerAccess") public Flowable<Object> toFlowable(BackpressureStrategy strategy) {
return mBus.toFlowable(strategy);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<Pair<WebSocket, String>> messages() {
return messages.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<WebSocket> connectingSockets() {
return connectedSockets.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<Pair<WebSocket, Throwable>> errors() {
return errors.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<Pair<WebSocket, CloseReason>> closingSockets() {
return closedSockets.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<Pair<WebSocket, String>> messages() {
return messages.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: ch.squaredesk.nova/websockets
@Override
public Flowable<Pair<WebSocket, Throwable>> errors() {
return errors.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: contentful/vault
public static Flowable<SyncResult> observeSyncResults() {
return SYNC_SUBJECT.toFlowable(BackpressureStrategy.BUFFER);
}
代码示例来源:origin: cr330326/DemoComponent
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*
* @param eventType 事件类型
* @return return
*/
private <T> Flowable<T> toObservable(Class<T> eventType) {
return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
}
代码示例来源:origin: simplezhli/RxPay
/**
* 返回指定类型的带背压的Flowable实例
*
* @param <T>
* @param type
* @return
*/
public <T>Flowable<T> toObservable(Class<T> type){
return mSubject.toFlowable(BackpressureStrategy.BUFFER)
.ofType(type);
}
/**
代码示例来源:origin: LuckSiege/PictureSelectorLight
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*
* @param eventType 事件类型
* @return return
*/
public <T> Flowable<T> toObservable(Class<T> eventType) {
return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
}
代码示例来源:origin: info.magnolia.ui/magnolia-ui-framework
public Disposable observeNullable(Consumer<T> action) {
return subject
.toFlowable(BackpressureStrategy.LATEST)
.map(optional -> optional)
.subscribe(
optional -> action.accept(optional.orElse(null)),
e -> log.error("Failed to dispatch context property change: {}", e.getMessage(), e));
}
代码示例来源:origin: ch.squaredesk.nova/http
@Override
public <T> Flowable<RpcInvocation<T>> requests(String destination, Class<T> targetType) {
URL destinationAsLocalUrl;
try {
destinationAsLocalUrl = new URL("http", "localhost", destination);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
Flowable retVal = mapDestinationToIncomingMessages
.computeIfAbsent(destination, key -> {
logger.info("Listening to requests on " + destination);
Subject<RpcInvocation> stream = PublishSubject.create();
stream = stream.toSerialized();
NonBlockingHttpHandler httpHandler = new NonBlockingHttpHandler(destinationAsLocalUrl, messageTranscriber, targetType, stream);
httpServer.getServerConfiguration().addHttpHandler(httpHandler, destination);
return stream.toFlowable(BackpressureStrategy.BUFFER)
.doFinally(() -> {
mapDestinationToIncomingMessages.remove(destination);
httpServer.getServerConfiguration().removeHttpHandler(httpHandler);
logger.info("Stopped listening to requests on " + destination);
})
.share();
});
return retVal;
}
代码示例来源:origin: LuckSiege/PictureSelectorLight
/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
*
* @param code 事件code
* @param eventType 事件类型
*/
private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
.filter(new Predicate<Message>() {
@Override
public boolean test(Message o) throws Exception {
return o.getCode() == code && eventType.isInstance(o.getObject());
}
}).map(new Function<Message, Object>() {
@Override
public Object apply(Message o) throws Exception {
return o.getObject();
}
}).cast(eventType);
}
代码示例来源:origin: cr330326/DemoComponent
/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
*
* @param code 事件code
* @param eventType 事件类型
*/
private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
.filter(new Predicate<Message>() {
@Override
public boolean test(Message o) throws Exception {
return o.getCode() == code && eventType.isInstance(o.getObject());
}
}).map(new Function<Message, Object>() {
@Override
public Object apply(Message o) throws Exception {
return o.getObject();
}
}).cast(eventType);
}
代码示例来源:origin: akarnokd/akarnokd-misc
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single())
.map(discarded -> {
代码示例来源:origin: com.github.davidmoten/state-machine-runtime
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
return Flowable.defer(() -> {
Worker worker = signalScheduler.createWorker();
Flowable<Signal<?, Id>> o0 = subject //
.toSerialized() //
.toFlowable(BackpressureStrategy.BUFFER) //
.mergeWith(signals) //
.doOnCancel(() -> worker.dispose()) //
.compose(preGroupBy);
Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
if (mapFactory != null) {
o = o0.groupBy(signal -> new ClassId(signal.cls(),
signal.id()), x -> x, true, 16, mapFactory);
} else {
o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
Functions.identity());
}
return o.flatMap(g -> {
Flowable<EntityStateMachine<?, Id>> obs = g //
.flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
.doOnNext(m -> stateMachines.put(g.getKey(), m)) //
.subscribeOn(processingScheduler); //
Flowable<EntityStateMachine<?, Id>> res = entityTransform
.apply(grouped(g.getKey(), obs));
return res;
});
});
}
内容来源于网络,如有侵权,请联系作者删除!