json 在wildfly 27中从Singleton EJB内部向外部REST资源发出HTTP请求

xu3bshqb  于 2023-07-01  发布在  其他
关注(0)|答案(1)|浏览(122)

我在wildfly 27上的EAR中部署了一个Singleton EJB,使用scheduled方法为部署在同一EAR中的WebSocket提供数据。
应用程序应该从外部资源收集股票和外汇报价,并向用户提供这些数据。目前,数据正在被模拟以允许测试,但最终bean需要从外部读取数据。
我知道在EJB中打开一个URLConnection是很关键的,所以我的问题是,如何才能实现这一点?
我想我需要为此实现一个资源适配器,但我缺乏这样做的知识,而且来自wildfly的文档相当有限。最受欢迎的将是如何实现这一目标的一个例子或替代办法。
Singleton相当简单:

@Startup
@Singleton
public class QuotesJob {
     
    private static final Logger logger = LogManager.getLogger(QuotesJob.class);
    
    private static Map<String, QuotesListener> listeners = new ConcurrentHashMap<>(); 
    private static Random random;
    
    private Map<String, Float> quote = new HashMap<>(); 
    {
        quote.put("EURUSD", 0.00000f);
        quote.put("USDCAD", 0.00000f);
        quote.put("NZDUSD", 0.00000f);
        quote.put("AUDUSD", 0.00000f);
        random = new Random();
    }
    
    
    @PostConstruct
    @Schedule(hour="*", minute="*", second="*/6", persistent=false)
    public void perform() {
        logger.info("quotes update...");
        quote.forEach((k, v) -> {
            quote.put(k, random.nextFloat());  // mock data
        });
        notifyListeners();
    }
    
    private void notifyListeners() {
        logger.info("notify listeners: " + quote);
        listeners.forEach((k, v) -> {
            v.consume(quote);
        });
    }

    public void addQuotesListener(String clazz, QuotesListener listener) {
        listeners.put(clazz, listener);
    }
    
    
    @PreDestroy
    public void endJob() {
        listeners.clear();
    }
    
}
jfewjypa

jfewjypa1#

工作起来很有魅力,再次感谢。我显然想得太复杂了。
这是现在的代码。我使用的是一个测试API,所以响应是无用的,但概念证明。
在重新部署时,辅助方法停止并在部署后继续。不要剩饭
当真实的的API可用时,可以对调度和超时进行微调。

@Startup
@Singleton
public class QuotesJob {

    private static final Logger logger = LogManager.getLogger(QuotesJob.class);

    private String remoteTargets[] = new String[] {
            "https://xyz.free.beeceptor.com",
            "https://xyz.free.beeceptor.com",
            "https://xyz.free.beeceptor.com",
            "https://xyz.free.beeceptor.com"
    };

    private static Map<String, QuotesListener> listeners = new ConcurrentHashMap<>();
    private static Random random;
    private HttpClient client = HttpClient.newHttpClient();
    private AtomicInteger counter = new AtomicInteger(0); 
    private AtomicInteger maxSize = new AtomicInteger(remoteTargets.length); 
    private static final long TIMEOUT = 3000;
    
    
    private Map<String, Float> quote = new HashMap<>();
    {
        quote.put("EURUSD", 0.00000f);
        quote.put("USDCAD", 0.00000f);
        quote.put("NZDUSD", 0.00000f);
        quote.put("AUDUSD", 0.00000f);
        random = new Random();
    }

    @PostConstruct
    @Schedule(hour = "*", minute = "*", second = "*/6", persistent = false)
    public void perform() {
        logger.info("running quotes update...");
        worker();
        quote.forEach((k, v) -> {
            quote.put(k, random.nextFloat());
        });
        notifyListeners();
    }

    private void worker() {
        counter.lazySet(0);;
        while (counter.intValue() < maxSize.intValue()) {
            HttpRequest request = HttpRequest.newBuilder().uri(URI.create(remoteTargets[counter.intValue()]))
                    .header("Accept", "application/json").GET().build();
            CompletableFuture<HttpResponse<String>> responseCompletableFuture = 
                    client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
            responseCompletableFuture
            .orTimeout(TIMEOUT, TimeUnit.MILLISECONDS)
            .whenComplete((resp, t) -> {
                counter.incrementAndGet();
                if (t != null) {
                    logger.error(resp.statusCode() + " Exception: " + t.getMessage());
                    return;
                } 
                if (resp.statusCode()!=200) {
                    logger.info(resp.statusCode() + " Error: " + resp.body());
                    return;
                }
                logger.info(resp.statusCode() + " Result: " + resp.body());
                // -> update quotes with real response
            }).join();
        }
    }

    private void notifyListeners() {
        logger.info("notify listeners: " + quote);
        listeners.forEach((k, v) -> {
            v.consume(quote);
        });
    }

    public void addQuotesListener(String clazz, QuotesListener listener) {
        listeners.put(clazz, listener);
    }

    @PreDestroy
    public void endJob() {
        counter.lazySet(maxSize.intValue());
        listeners.clear();
    }

}

相关问题