这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
public class MyEvent {
/**
* 事件源
*/
private String source;
/**
* 事件被消费的总次数
*/
private AtomicInteger consumeNum;
public MyEvent(String source) {
this.source = source;
consumeNum = new AtomicInteger();
}
/**
* 事件被消费次数加一
* @return
*/
public int addNum() {
return consumeNum.incrementAndGet();
}
/**
* 获取事件被消费次数
* @return
*/
public int getNum() {
return consumeNum.get();
}
@Override
public String toString() {
return "MyEvent{" +
"source='" + source + '\'' +
", consumeNum=" + getNum() +
'}';
}
}
package com.bolingcavalry.event.producer;
import com.bolingcavalry.event.bean.MyEvent;
import io.quarkus.logging.Log;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
@ApplicationScoped
public class MyProducer {
@Inject
Event<MyEvent> event;
/**
* 发送同步消息
* @param source 消息源
* @return 被消费次数
*/
public int syncProduce(String source) {
MyEvent myEvent = new MyEvent("syncEvent");
Log.infov("before sync fire, {0}", myEvent);
event.fire(myEvent);
Log.infov("after sync fire, {0}", myEvent);
return myEvent.getNum();
}
}
package com.bolingcavalry.event.consumer;
import com.bolingcavalry.event.bean.MyEvent;
import io.quarkus.logging.Log;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
public class MyConsumer {
/**
* 消费同步事件
* @param myEvent
*/
public void syncConsume(@Observes MyEvent myEvent) {
Log.infov("receive sync event, {0}", myEvent);
// 模拟业务执行,耗时100毫秒
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数加一
myEvent.addNum();
}
}
package com.bolingcavalry;
import com.bolingcavalry.event.consumer.MyConsumer;
import com.bolingcavalry.event.producer.MyProducer;
import com.bolingcavalry.service.HelloInstance;
import com.bolingcavalry.service.impl.HelloInstanceA;
import com.bolingcavalry.service.impl.HelloInstanceB;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
@QuarkusTest
public class EventTest {
@Inject
MyProducer myProducer;
@Inject
MyConsumer myConsumer;
@Test
public void testSync() {
Assertions.assertEquals(1, myProducer.syncProduce("testSync"));
}
}
public int asyncProduce(String source) {
MyEvent myEvent = new MyEvent(source);
Log.infov("before async fire, {0}", myEvent);
event.fireAsync(myEvent)
.handleAsync((e, error) -> {
if (null!=error) {
Log.error("handle error", error);
} else {
Log.infov("finish handle, {0}", myEvent);
}
return null;
});
Log.infov("after async fire, {0}", myEvent);
return myEvent.getNum();
}
public void aSyncConsume(@ObservesAsync MyEvent myEvent) {
Log.infov("receive async event, {0}", myEvent);
// 模拟业务执行,耗时100毫秒
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数加一
myEvent.addNum();
}
@Test
public void testAsync() throws InterruptedException {
Assertions.assertEquals(0, myProducer.asyncProduce("testAsync"));
// 如果不等待的话,主线程结束的时候会中断正在消费事件的子线程,导致子线程报错
Thread.sleep(150);
}
public class TwoChannelEvent {
/**
* 事件源
*/
private String source;
/**
* 事件被消费的总次数
*/
private AtomicInteger consumeNum;
public TwoChannelEvent(String source) {
this.source = source;
consumeNum = new AtomicInteger();
}
/**
* 事件被消费次数加一
* @return
*/
public int addNum() {
return consumeNum.incrementAndGet();
}
/**
* 获取事件被消费次数
* @return
*/
public int getNum() {
return consumeNum.get();
}
@Override
public String toString() {
return "TwoChannelEvent{" +
"source='" + source + '\'' +
", consumeNum=" + getNum() +
'}';
}
}
package com.bolingcavalry.annonation;
import javax.inject.Qualifier;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Qualifier
@Retention(RUNTIME)
@Target({FIELD, PARAMETER})
public @interface Admin {
}
@Qualifier
@Retention(RUNTIME)
@Target({FIELD, PARAMETER})
public @interface Normal {
}
@ApplicationScoped
public class TwoChannelWithTwoEvent {
@Inject
@Admin
Event<TwoChannelEvent> adminEvent;
@Inject
@Normal
Event<TwoChannelEvent> normalEvent;
/**
* 管理员消息
* @param source
* @return
*/
public int produceAdmin(String source) {
TwoChannelEvent event = new TwoChannelEvent(source);
adminEvent.fire(event);
return event.getNum();
}
/**
* 普通消息
* @param source
* @return
*/
public int produceNormal(String source) {
TwoChannelEvent event = new TwoChannelEvent(source);
normalEvent.fire(event);
return event.getNum();
}
}
注解修饰,相当于为它们添加了不同的标签,在消费的时候也可以用这两个注解来过滤
@ApplicationScoped
public class TwoChannelConsumer {
/**
* 消费管理员事件
* @param event
*/
public void adminEvent(@Observes @Admin TwoChannelEvent event) {
Log.infov("receive admin event, {0}", event);
// 管理员的计数加两次,方便单元测试验证
event.addNum();
event.addNum();
}
/**
* 消费普通用户事件
* @param event
*/
public void normalEvent(@Observes @Normal TwoChannelEvent event) {
Log.infov("receive normal event, {0}", event);
// 计数加一
event.addNum();
}
/**
* 如果不用注解修饰,所有TwoChannelEvent类型的事件都会在此被消费
* @param event
*/
public void allEvent(@Observes TwoChannelEvent event) {
Log.infov("receive event (no Qualifier), {0}", event);
// 计数加一
event.addNum();
}
}
@QuarkusTest
public class EventTest {
@Inject
TwoChannelWithTwoEvent twoChannelWithTwoEvent;
@Test
public void testTwoChnnelWithTwoEvent() {
// 对管理员来说,
// TwoChannelConsumer.adminEvent消费时计数加2,
// TwoChannelConsumer.allEvent消费时计数加1,
// 所以最终计数是3
Assertions.assertEquals(3, twoChannelWithTwoEvent.produceAdmin("admin"));
// 对普通人员来说,
// TwoChannelConsumer.normalEvent消费时计数加1,
// TwoChannelConsumer.allEvent消费时计数加1,
// 所以最终计数是2
Assertions.assertEquals(2, twoChannelWithTwoEvent.produceNormal("normal"));
}
}
/**
* @author will
* @email zq2599@gmail.com
* @date 2022/4/3 10:16
* @description 用同一个事件结构体TwoChannelEvent,分别发送不同业务类型的事件
*/
@ApplicationScoped
public class TwoChannelWithSingleEvent {
@Inject
Event<TwoChannelEvent> singleEvent;
/**
* 管理员消息
* @param source
* @return
*/
public int produceAdmin(String source) {
TwoChannelEvent event = new TwoChannelEvent(source);
singleEvent.select(new AnnotationLiteral<Admin>() {})
.fire(event);
return event.getNum();
}
/**
* 普通消息
* @param source
* @return
*/
public int produceNormal(String source) {
TwoChannelEvent event = new TwoChannelEvent(source);
singleEvent.select(new AnnotationLiteral<Normal>() {})
.fire(event);
return event.getNum();
}
}
@QuarkusTest
public class EventTest {
@Inject
TwoChannelWithSingleEvent twoChannelWithSingleEvent;
@Test
public void testTwoChnnelWithSingleEvent() {
// 对管理员来说,
// TwoChannelConsumer.adminEvent消费时计数加2,
// TwoChannelConsumer.allEvent消费时计数加1,
// 所以最终计数是3
Assertions.assertEquals(3, twoChannelWithSingleEvent.produceAdmin("admin"));
// 对普通人员来说,
// TwoChannelConsumer.normalEvent消费时计数加1,
// TwoChannelConsumer.allEvent消费时计数加1,
// 所以最终计数是2
Assertions.assertEquals(2, twoChannelWithSingleEvent.produceNormal("normal"));
}
}
public void allEvent(@Observes TwoChannelEvent event, EventMetadata eventMetadata) {
Log.infov("receive event (no Qualifier), {0}", event);
// 打印事件类型
Log.infov("event type : {0}", eventMetadata.getType());
// 获取该事件的所有注解
Set<Annotation> qualifiers = eventMetadata.getQualifiers();
// 将事件的所有注解逐个打印
if (null!=qualifiers) {
qualifiers.forEach(annotation -> Log.infov("qualify : {0}", annotation));
}
// 计数加一
event.addNum();
}
《新程序员》:云原生和全面数字化实践
50位技术专家共同创作,文字、视频、音频交互阅读
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://xinchen.blog.csdn.net/article/details/124069627
内容来源于网络,如有侵权,请联系作者删除!