使用javaflowapi。我有一条项链 Processor
s和终端 Subscriber
. 我已经编码好了 Subscriber
对接收到的项目进行计数的 SubmissionPublisher
.
但是,当我从 Processor
(延伸到 SubmissionPublisher
)到 Subscriber
下面,没有输出打印到控制台。
如果我移除 minValue.get()
文本被打印到控制台。为什么调用类变量会导致 onComplete
不按预期执行?
public class MinValue implements Subscriber<Integer> {
private Flow.Subscription subscription;
private AtomicInteger minValue=new AtomicInteger(Integer.MAX_VALUE);
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(minValue.get()>item){
System.out.println("Min Value : "+item);
minValue.set(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error + throwable.getMessage() \nMin Value is : "+minValue.get());
}
@Override
public void onComplete() {
System.out.println("Successful Completion - Min Value is : "+minValue.get());
}
}
编辑-添加最小可复制示例
public class NewClass {
public static void main(String args[]) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new MinValue();
publisher.subscribe(subscriber);
publisher.submit(10);
publisher.submit(11);
publisher.submit(9);
publisher.submit(12);
publisher.submit(8);
publisher.close();
}
}
2条答案
按热度按时间nwlqm0z11#
get()返回
int
当项目是Integer
对象。也许这就是问题所在。snz8szmq2#
问题是在调用订阅服务器oncomplete()之前,主线程正在退出。
如果在调用publisher.close()后添加睡眠,subscriber.oncomplete()将有时间在自己的线程上执行。