Spring @Async注解实现异步处理

x33g5p2x  于2022-09-16 转载在 Spring  
字(9.4k)|赞(0)|评价(0)|浏览(479)

在本文中,我们将探索 Spring @Async 注解。我们将在 @Async 和 **@EnableAsync 注解**的帮助下学习 Spring 中的异步执行。

介绍

Spring 提供了在单独的线程中运行长时间运行的进程的功能。 此功能在扩展服务时很有帮助。通过使用 @Async 和 @EnableAsync 注解,我们可以在后台运行复杂的任务并使用 Java 的 CompletableFuture 接口等待结果。

1. 通过@EnableAsync 启用异步支持

启用异步处理,请将@EnableAsync 注解 添加到配置类中

@Configuration
@EnableAsync
public class ApplicationConfiguration {
    //additional configurations
}

@EnableAsync 注释开启了 Spring 在后台线程池中运行 @Async 方法的能力。在大多数情况下,这就可以启用异步处理,默认情况下,@EnableAsync 会自动检测 Spring 的 @Async 注解。

2. Spring @Async 注解

我们需要将 @Async 注解 添加到我们希望在单独的线程中启用异步处理的方法上。

@Async
public void updateCustomer(Customer customer) {
 //long running background process.
}

在使用这个注解时,我们应该记住一些规则。

  1. @Async 注解必须在公共方法上。 Spring为此注解使用代理,并且它必须是公共的,代理才能生效。
  2. 在同一个类中调用异步方法。它不会生效(这样的方法调用将绕过代理)。
  3. 返回类型为 CompletableFuture 或 Future 的方法。

3. @Async 如何工作

一旦我们在方法上添加了 @Async,spring 框架就会基于 proxyTargetClass 属性创建一个代理。对于此方法的传入请求。

  1. Spring 尝试查找与上下文关联的线程池。它使用这个线程池在单独的线程中提交请求并释放主线程。
  2. Spring 将搜索 TaskExecutor bean 或名为 taskExecutor 的 bean,否则它将回退到 SimpleAsyncTaskExecutor

让我们看看可以使用 @Async 注释的 2 中方式。

3.1 返回无效的方法

如果我们的方法返回类型是 void,我们不需要执行任何额外的步骤。简单添加注释。

@Async
 public void updateCustomer(Customer customer) {
     // run the background process
}

Spring 将在单独的线程中自动启动。

3.2 返回类型的方法

如果方法有返回类型,我们必须用 CompletableFuture 或 Future 包装它。

@Async
public CompletableFuture getCustomerByID(final String id) throws InterruptedException {
    //run the process
    return CompletableFuture.completedFuture(customer);
}

4. 执行者

Spring 需要一个线程池来管理后台进程的线程。它将搜索 TaskExecutor bean 或名为 taskExecutor 的 bean。它将退回到 SimpleAsyncTaskExecutor。有时,我们可能需要根据需要自定义线程池行为,spring 提供了以下 2 个选项来自定义执行器。

在大多数情况下,我们最终会在方法级别使用自定义执行器。在我们研究这两个选项之前,让我们创建一个自定义执行器 bean。

@Bean(name = "threadPoolTaskExecutor")
public Executor asyncExecutor() {
   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
   executor.setCorePoolSize(4);
   executor.setMaxPoolSize(4);
   executor.setQueueCapacity(50);
   executor.setThreadNamePrefix("AsynchThread::");
   executor.initialize();
   return executor;
}
4.1 方法级执行器

使用自定义执行器 bean 名称作为 @Async 的属性:

@Async("threadPoolTaskExecutor")
public CompletableFuture < Customer > getCustomerByID(final String id) throws InterruptedException {
 //background or long running process
}
4.2 在应用层覆盖执行器

在配置类中实现AsyncConfigurer接口,在应用层使用自定义执行器。 getAsyncExecutor() 方法返回应用程序级别的执行程序。

@Configuration
public class ServiceExecutorConfig implements AsyncConfigurer {

 @Override
 public Executor getAsyncExecutor() {
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  taskExecutor.setCorePoolSize(4);
  taskExecutor.setMaxPoolSize(4);
  taskExecutor.setQueueCapacity(50);
  taskExecutor.initialize();
  return taskExecutor;
 }
}
4.3 多个ThreadPoolTask​​Executors

您可以定义多个执行器 bean,以防您希望为不同的任务使用不同的 ThreadPoolTaskExecutors

@Configuration
@EnableAsync
public class ApplicationConfiguration {

 @Bean(name = "threadPoolTaskExecutor1")
 public Executor executor1() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(4);
  executor.setMaxPoolSize(4);
  executor.setQueueCapacity(50);
  executor.setThreadNamePrefix("CustomExecutor1::");
  executor.initialize();
  return executor;
 }

 @Bean(name = "threadPoolTaskExecutor2")
 public Executor executor2() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(4);
  executor.setMaxPoolSize(4);
  executor.setQueueCapacity(50);
  executor.setThreadNamePrefix("CustomExecutor2::");
  executor.initialize();
  return executor;
 }
}

我们可以这样使用它们:

@Async("threadPoolTaskExecutor1")
public void methodA() {}

@Async("threadPoolTaskExecutor2")
public void methodB() {}

5. 实际应用

到目前为止,我们看到了核心概念和配置,让我们看看 Spring @Async 注释 的作用。我们将从使用 Spring Initilizr 设置应用程序开始。我们可以使用 web 版本,也可以使用 IDE 来构建应用程序。这是 pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.3.1.RELEASE</version>
      <relativePath />
      <!-- lookup parent from repository -->
   </parent>
   <groupId>com.javadevjournal</groupId>
   <artifactId>spring-async</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>Spring @Async for Asynchronous Processing</name>
   <description>Spring @Async for Asynchronous Processing</description>
   <properties>
      <java.version>1.8</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-devtools</artifactId>
         <scope>runtime</scope>
         <optional>true</optional>
      </dependency>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
         <exclusions>
            <exclusion>
               <groupId>org.junit.vintage</groupId>
               <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
   </dependencies>
   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
</project>

让我们创建我们的服务类,它将模拟长时间运行的过程:

package com.javadevjournal.customer.service;

import com.javadevjournal.data.customer.Customer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class DefaultCustomerService implements CustomerService {

 private static final Logger LOG = LoggerFactory.getLogger(DefaultCustomerService.class);

 @Override
 @Async("threadPoolTaskExecutor")
 public CompletableFuture < Customer > getCustomerByID(final String id) throws InterruptedException {
  LOG.info("Filling the customer details for id {} ", id);
  Customer customer = new Customer();
  customer.setFirstName("Javadev");
  customer.setLastName("Journal");
  customer.setAge(34);
  customer.setEmail("[email protected]");
  // doing an artificial sleep
  Thread.sleep(20000);
  return CompletableFuture.completedFuture(customer);
 }

 @Override
 @Async("threadPoolTaskExecutor")
 public void updateCustomer(Customer customer) {
  LOG.warn("Running method with thread {} :", Thread.currentThread().getName());
  // do nothing
 }

 @Override
 public Customer getCustomerByEmail(String email) throws InterruptedException {
  LOG.info("Filling the customer details for email {}", email);
  Customer customer = new Customer();
  customer.setFirstName("New");
  customer.setLastName("Customer");
  customer.setAge(30);
  customer.setEmail("[email protected]");
  Thread.sleep(20000);
  return customer;
 }
}

我们通过添加 Thread.sleep(2000) 来延迟响应。这是为了模拟缓慢移动的服务。

5.1 控制器

我们的控制器是一个简单的类,如下:

@RestController
@RequestMapping("/customers")
public class CustomerController {

 @Autowired
 CustomerService customerService;

 @GetMapping("/customer/{id}")
 public CompletableFuture < Customer > getCustomerById(@PathVariable String id) throws InterruptedException {
  return customerService.getCustomerByID(id);
 }

 @PutMapping("/customer/update")
 public void updateCustomer() {
  customerService.updateCustomer(null);
 }

 @GetMapping("/customer/id/{email}")
 public Customer getCustomerByEmail(@PathVariable String email) throws InterruptedException {
  return customerService.getCustomerByEmail(email);
 }
}
5.2 构建和运行应用程序

让我们运行应用程序来看看它的实际效果。应用程序启动并运行后,点击以下 URL http://localhost:8080/customers/customer/12 并检查服务器日志。您将看到类似的输出:

2020-07-10 18:37:10.403  INFO 12056 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-07-10 18:37:10.418  INFO 12056 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 15 ms
2020-07-10 18:37:10.524  INFO 12056 --- [AsynchThread::1] c.j.c.service.DefaultCustomerService     : Filling the customer details for id 12

如果您仔细观察,该请求正在一个新线程 [AsynchThread::1] 中执行。这将长时间运行的进程,因为我们可以在单独的线程中运行该进程而不阻塞主线程。要更详细地验证这一点,请点击以下 URL http://localhost:8080/customers/customer/id/[email protected](服务方法不包含 @Async 注释)。

2020-07-10 18:37:10.418  INFO 12056 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 15 ms
2020-07-10 18:37:10.524  INFO 12056 --- [AsynchThread::1] c.j.c.service.DefaultCustomerService     : Filling the customer details for id 12 
2020-07-10 18:40:33.546  INFO 12056 --- [nio-8080-exec-4] c.j.c.service.DefaultCustomerService     : Filling the customer details for email [email protected]

6. 异常处理

要使用 @Async annotation* 处理 *异常,请记住以下关键点。

1.如果返回类型为CompletableFutureFutureFuture.get()方法会抛出异常。
1.对于void返回类型,我们需要添加额外的配置,因为异常不会传播到调用线程。

要处理 void 返回类型的异常,我们需要通过实现 AsyncUncaughtExceptionHandler 接口来创建异步异常处理程序。

public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    private static final Logger LOG = LoggerFactory.getLogger(CustomAsyncExceptionHandler.class);
    @Override
    public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
        LOG.error("Exception while executing with message {} ", throwable.getMessage());
        LOG.error("Exception happen in {} method ", method.getName());
    }
}

最后一步是在我们的配置类中配置这个 AsyncUncaughtExceptionHandler

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
 return new CustomAsyncExceptionHandler();
}

总结

在本文中,我们讨论了 Spring @Async 注解。我们在本文中介绍了以下几点:

  1. 如何使用@Aync 注解在单独的线程池中运行长时间运行的进程。
  2. Spring中何时使用异步执行支持
    3.自定义线程池的自定义执行器。
    4.如何处理异常。

本文的源代码可在 GitHub 上找到。

相关文章