spring批处理远程分区工作进程没有并行启动

r55awzrz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(185)

我们正在尝试从本地分区移动到远程分区。我们面临的问题是远程步骤分区没有并行运行。
下面是在本地分区中成功并行运行的当前代码

@Bean
@Qualifier("masterStep")
public Step masterStep(final JpaRepositoryItemWriter<TutoPeople> writer) throws Exception {
    return stepBuilderFactory.get("masterStep").partitioner("slaveStep", partitioner()).step(workerStep(writer))
            .gridSize(8)).build();
}

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setMaxPoolSize(30);
    taskExecutor.setCorePoolSize(20);
    //taskExecutor.setQueueCapacity(threadQueuePoolSize);
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

下面是新代码,它不是并行运行,而是按顺序执行。

经理代码-主

@Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow() {
        final KafkaProducerMessageHandler kafkaMessageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        kafkaMessageHandler.setTopicExpression(new LiteralExpression("batchtopic"));
        return IntegrationFlows
                .from(requests())
                .handle(kafkaMessageHandler)
                .get();
    }
   @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    } 
   @Bean
    public ResourcePartitioner partitioner() throws IOException {
        return new ResourcePartitioner().setResources(getResources());
    }

    @Bean
    public Job partitionerJob(final BatchJobExecutionListener listener, final Step managerStep) {
        return jobBuilderFactory.get("partitionerJob")
                .listener(listener)
                .start(managerStep)
                .build();
    }

     @Bean
     public Step managerStep() throws Exception {
            return remoteStepBuilderFactory.get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(8)
                    .outputChannel(requests())
                    .inputChannel(replies())
                    .build();
        }

  private Resource[] getResources() throws IOException {
    final File inputDir = new FileSystemResource(this.dataPath).getFile();
    if (!inputDir.exists() || !inputDir.isDirectory())
        throw new IOException("Bad input configuration");

    return Arrays.stream(inputDir.listFiles()).map(FileSystemResource::new).toArray(FileSystemResource[]::new);
}

工作者代码

@Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }
 @Bean
    public IntegrationFlow inboundFlow() {
        final ContainerProperties containerProps = new ContainerProperties("batchtopic");
        final KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(kafkaFactory, containerProps);
        final KafkaMessageDrivenChannelAdapter kafkaMessageChannel = new KafkaMessageDrivenChannelAdapter(container);

        return IntegrationFlows
                .from(kafkaMessageChannel)
                .channel(requests())
                .get();
    }

 @Bean
 @StepScope
    public FlatFileItemReader<TutoPeople> reader(@Value("#{stepExecutionContext[filePath]}") String filePath) {
        return new FlatFileItemReaderBuilder<TutoPeople>()
                .name("personReader")
                .resource(new FileSystemResource(filePath))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<TutoPeople>() {{
                    setTargetType(TutoPeople.class);
                }})
                .build();
    }

    @Bean
    @StepScope
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    @StepScope
    public JpaRepositoryItemWriter<TutoPeople> writer(final TutoPeopleRepository repository) {
        return new JpaRepositoryItemWriter<>(repository);
    }

    @Bean
    public Step workerStep(final JpaRepositoryItemWriter<TutoPeople> writer) {
        return remoteStepBuilderFactory.get("workerStep")
                .inputChannel(requests())
                .outputChannel(replies())
                .startLimit(5)
                .<TutoPeople, TutoPeople>chunk(5)
                .reader(reader(null))
                .processor(processor())
                .writer(writer)
                .build();
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题