我们正在尝试从本地分区移动到远程分区。我们面临的问题是远程步骤分区没有并行运行。
下面是在本地分区中成功并行运行的当前代码
@Bean
@Qualifier("masterStep")
public Step masterStep(final JpaRepositoryItemWriter<TutoPeople> writer) throws Exception {
return stepBuilderFactory.get("masterStep").partitioner("slaveStep", partitioner()).step(workerStep(writer))
.gridSize(8)).build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(20);
//taskExecutor.setQueueCapacity(threadQueuePoolSize);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
下面是新代码,它不是并行运行,而是按顺序执行。
经理代码-主
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow() {
final KafkaProducerMessageHandler kafkaMessageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
kafkaMessageHandler.setTopicExpression(new LiteralExpression("batchtopic"));
return IntegrationFlows
.from(requests())
.handle(kafkaMessageHandler)
.get();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public ResourcePartitioner partitioner() throws IOException {
return new ResourcePartitioner().setResources(getResources());
}
@Bean
public Job partitionerJob(final BatchJobExecutionListener listener, final Step managerStep) {
return jobBuilderFactory.get("partitionerJob")
.listener(listener)
.start(managerStep)
.build();
}
@Bean
public Step managerStep() throws Exception {
return remoteStepBuilderFactory.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(8)
.outputChannel(requests())
.inputChannel(replies())
.build();
}
private Resource[] getResources() throws IOException {
final File inputDir = new FileSystemResource(this.dataPath).getFile();
if (!inputDir.exists() || !inputDir.isDirectory())
throw new IOException("Bad input configuration");
return Arrays.stream(inputDir.listFiles()).map(FileSystemResource::new).toArray(FileSystemResource[]::new);
}
工作者代码
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow() {
final ContainerProperties containerProps = new ContainerProperties("batchtopic");
final KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(kafkaFactory, containerProps);
final KafkaMessageDrivenChannelAdapter kafkaMessageChannel = new KafkaMessageDrivenChannelAdapter(container);
return IntegrationFlows
.from(kafkaMessageChannel)
.channel(requests())
.get();
}
@Bean
@StepScope
public FlatFileItemReader<TutoPeople> reader(@Value("#{stepExecutionContext[filePath]}") String filePath) {
return new FlatFileItemReaderBuilder<TutoPeople>()
.name("personReader")
.resource(new FileSystemResource(filePath))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<TutoPeople>() {{
setTargetType(TutoPeople.class);
}})
.build();
}
@Bean
@StepScope
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
@StepScope
public JpaRepositoryItemWriter<TutoPeople> writer(final TutoPeopleRepository repository) {
return new JpaRepositoryItemWriter<>(repository);
}
@Bean
public Step workerStep(final JpaRepositoryItemWriter<TutoPeople> writer) {
return remoteStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.outputChannel(replies())
.startLimit(5)
.<TutoPeople, TutoPeople>chunk(5)
.reader(reader(null))
.processor(processor())
.writer(writer)
.build();
}
暂无答案!
目前还没有任何答案,快来回答吧!