kubernetes AWS EKS中的多线程

14ifxucb  于 2023-06-21  发布在  Kubernetes
关注(0)|答案(1)|浏览(158)

我正在使用Spring Batch和Kubernetes Cron Manager来调度和运行作业。该作业涉及调用外部API来读取、处理和写入数据。但是,当我使用200,000个项目的数据集执行作业时,需要花费非常长的时间才能完成,至少需要5个小时。
在我的配置中,我在Kubernetes集群中设置了一个带有单个pod的单个副本集,并且我还配置了Spring Batch以使用40个任务执行器进行并发处理。尽管有这些设置,作业执行时间仍然非常慢。
如果您对如何提高我的工作执行时间有任何见解或建议,我将不胜感激。在这种情况下使用Spring Batch和Kubernetes Cron Manager时,我应该考虑哪些特定的优化或最佳实践?
这是我的spring batch kubernetes配置

resources:
    requests:
      cpu: 8
      memory: 8Gi
    limits:
      cpu: 16
      memory: 16Gi

这是我的任务执行者

@Bean
    @StepScope
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(40);
        executor.setMaxPoolSize(40);
        executor.setThreadNamePrefix("spring_batch_worker-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

这是我的作家

public class Writer<T> implements ItemWriter<String> {
        private static final Logger logger = LoggerFactory.getLogger(Writer.class);

        private final String paymentType;
        private final String type;

        public Writer(String paymentType,
                      String type) {
                this.paymentType = paymentType;
                this.type = type;
        }

        @Autowired
        private TaskExecutor taskExecutor;

        @Autowired
        private Client client;

        @Override
        public void write(List<? extends String> users) throws Exception {
                for (String userId : users) {
                        taskExecutor.execute(() -> {
                                String currentThreadName = Thread.currentThread()
                                                                 .getName();
                                try {
                                        logger.info("action", "repayment_processing_item",
                                                "threadName", currentThreadName,
                                                "userId", userId,
                                                "paymentType", paymentType,
                                                "type", type);
                                        client.makePayment(userId, paymentType, type);
                                } catch (Exception e) {
                                        logger.error("action", "repayment_failed_to_process_item",
                                                "errorMessage", GeneralUtil.getErrorMessage(e),
                                                "threadName", currentThreadName,
                                                "userId", userId,
                                                "paymentType", paymentType,
                                                "type", type);
                                }
                        });
                }
        }
}
33qvvth1

33qvvth11#

您共享的不是40 task executors,而是一个具有40个线程池的任务执行器。这是不同的。
此外,您将提交项以作为任务写入项编写器中的不同线程。像并发这样的技术问题应该由框架来处理,如果在步骤本身上设置任务执行器,Spring Batch已经提供了并发处理。条目编写器应该专注于编写条目,例如:

@Override
public void write(List<? extends String> users) throws Exception {
   for (String userId : users) {
      client.makePayment(userId, paymentType, type);
   }
}

现在,如果您想在不同的Pod上扩展这样的流程,您可以使用分区作业,其中每个分区由Pod处理。

相关问题