spring 如何使用Sping Boot 构建长期运行的部分异步服务?

y4ekin9u  于 2023-08-02  发布在  Spring
关注(0)|答案(1)|浏览(91)

我将以我对Spring很陌生的事实来开始这篇文章。但我尝试做的是创建一个服务MyService,它通过REST调用周期性地在单个线程上从另一个外部服务SomeService拉取作业,然后将它们分派给另一个“东西”,称为JobRunner,它接受每个作业并对其执行某些操作。我希望JobRunner是异步的,但我不确定在Spring中最正确的方法是什么。来自非IoC世界,流程看起来像这样(Go语法):

for {
    job, err := getJobFromSomeService()

    if job != nil {
        go runJob(job)
    }
}

字符串
在Spring中,这样的布局会是什么样子?我正在考虑使用某种@Scheduled Poller组件,以便在某个时间间隔拉取作业,但我不确定如何在JobRunner组件中访问这些作业。也许可以添加某种线程安全的Queue bean,并将其连接到PollerJobRunner中,作为某种共享内存解决方案?但是我不确定这在Spring等中是否是一个好的实践,甚至不确定如何通过Spring正确地设置它。
我基本上是在寻找关于使用Sping Boot 解决设计问题的正确方法的建议,而不是非“控制反转”解决方案。

r8xiu3jd

r8xiu3jd1#

我认为有两件事需要考虑:
1.调度调用其他服务的作业(使用http客户端库,如resttemplate
1.在另一个线程中运行作业(使用线程池)
在我看来,你可以尝试下面的东西:

@Autowired
private RestTemplate restTemplate;

@Autowired
private ThreadPoolExecutor threadPoolExecutor;

@Scheduled(cron = "0 * * * * ?")
public void runJobs() {

    HttpHeaders headers2 = new HttpHeaders();
    headers2.setContentType(MediaType.APPLICATION_JSON);
    headers2.set("Host", "api.xxx.com");
    headers2.set("Accept", "*/*");
    headers2.set("Authorization", "");
    String requestBody = "some request body";
    HttpEntity<String> entity = new HttpEntity<>(requestBody, headers2);
    ResponseEntity<String> response = restTemplate.exchange("http://api.xxx.com/xxx", HttpMethod.POST, entity, String.class);
    JSONObject jsonObject = new JSONObject(response.getBody());

    JSONObject data = jsonObject.getJSONObject("data");

    JSONArray jsonArray = data.getJSONArray("listJobs");
    for (Object o : jsonArray) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            runOneJob(o);
        }, threadPoolExecutor);
        future.whenComplete((aVoid, throwable) -> {
            if (throwable != null) {
                log.error("runOneJob error", throwable);
            }
        });
    }

}

private void runOneJob(Object o) {
    // do something
}
@Bean
public RestTemplate restTemplate() {

    HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();

    PoolingHttpClientConnectionManager poolingConnectionManager = new PoolingHttpClientConnectionManager();
    poolingConnectionManager.setMaxTotal(1024);
    poolingConnectionManager.setDefaultMaxPerRoute(128);

    HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
    httpClientBuilder.setConnectionManager(poolingConnectionManager);

    clientHttpRequestFactory.setHttpClient(httpClientBuilder.build());
    clientHttpRequestFactory.setConnectTimeout(100);
    clientHttpRequestFactory.setReadTimeout(5000);

    return new RestTemplate(clientHttpRequestFactory);
}

@Bean
public ThreadPoolExecutor threadPoolExecutor() {
    return new ThreadPoolExecutor(
            10,
            100,
            2L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(300),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
}

字符串

相关问题