java Spring批处理和多线程步骤

wvmv3b1j  于 2022-10-30  发布在  Java
关注(0)|答案(2)|浏览(180)

我目前正在处理一个批处理,该批处理使用来自具有数百万行的大型SQL数据库的数据。
它在处理器中进行一些处理,包括通过带有连接的大型sql查询对从读取器检索到的行进行分组。
然后编写器将结果写入另一个数据表。
问题在于此批处理存在性能问题,因为Sql选择查询需要花费大量时间,并且这些步骤不是在多线程中执行的。
因此,我想以多线程方式运行它们,但问题是,这些步骤通过计算所有具有相同类型的行的总数来对行进行分组。
所以如果我把它放在多线程中,我怎么能做到呢,当每个分区都要在不同的线程中处理时,我知道有数百万行,我不能存储在上下文中,以便在步骤之后检索它们,并进行分组,而且我也不能将它们保存在数据库中,因为它有数百万行,你知道我怎么做吗?我希望我能很好地解释我的问题。提前感谢你的帮助

b1zrtrql

b1zrtrql1#

我有一个类似的任务,像你的,不像我们使用java 1.7和spring 3.x。我可以提供一个配置在xml中,所以也许你将能够使用注解配置为此我没有尝试。

<batch:job id="dualAgeRestrictionJob">
    <-- use a listner if you need -->
    <batch:listeners>
        <batch:listener ref="dualAgeRestrictionJobListener" />
    </batch:listeners>
    <!-- master step, 10 threads (grid-size) -->
    <batch:step id="dualMasterStep">
        <partition step="dualSlaveStep"
            partitioner="arInputRangePartitioner">
            <handler grid-size="${AR_GRID_SIZE}" task-executor="taskExecutor" />
        </partition>
    </batch:step>   
</batch:job>
<-- here you define your reader processor and writer and the commit interval -->
<batch:step id="dualSlaveStep">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="arInputPagingItemReader"
            writer="arOutputWriter" processor="arInputItemProcessor"
            commit-interval="${AR_COMMIT_INTERVAL}" />
    </batch:tasklet>
</batch:step>
<!-- The partitioner -->
<bean id="arInputRangePartitioner" class="com.example.ArInputRangePartitioner">
    <property name="arInputDao" ref="arInputJDBCTemplate" />
    <property name="statsForMail" ref="statsForMail" />
</bean>
<bean id="taskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="${AR_CORE_POOL_SIZE}" />
    <property name="maxPoolSize" value="${AR_MAX_POOL_SIZE}" />
    <property name="allowCoreThreadTimeOut" value="${AR_ALLOW_CORE_THREAD_TIME_OUT}" />
</bean>
<bean id="transactionManager"
        class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="kvrDatasource" />
</bean>

分区程序进行查询以计算行数并为每个线程创建块:

public class ArInputRangePartitioner implements Partitioner {

    private static final Logger logger = LoggerFactory.getLogger(ArInputRangePartitioner.class);

    private ArInputDao arInputDao;

    private StatsForMail statsForMail;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        // You can make a query and then divede the from to for each thread
        Map<Integer,Integer> idMap = arInputDao.getOrderIdList();
        Integer countRow = idMap.size();
        statsForMail.setNumberOfRecords( countRow );  
        Integer range = countRow / gridSize;
        Integer remains = countRow % gridSize;
        int fromId = 1;
        int toId = range;
        for (int i = 1; i <= gridSize; i++) {
            ExecutionContext value = new ExecutionContext();
            if(i == gridSize) {
                toId += remains;
            }
            logger.info("\nStarting : Thread {}", i);
            logger.info("fromId : {}", idMap.get(fromId) );
            logger.info("toId : {}", idMap.get(toId) );
            value.putInt("fromId", idMap.get(fromId) );
            value.putInt("toId", idMap.get(toId) );
            value.putString("name", "Thread" + i);
            result.put("partition" + i, value);
            fromId = toId + 1;
            toId += range;
        }
        return result;
    }

    public ArInputDao getArInputDao() {
        return arInputDao;
    }

    public void setArInputDao(ArInputDao arInputDao) {
        this.arInputDao = arInputDao;
    }

    public StatsForMail getStatsForMail() {
        return statsForMail;
    }

    public void setStatsForMail(StatsForMail statsForMail) {
        this.statsForMail = statsForMail;
    }

}

以下是读取器和写入器的配置:

<bean id="arInputPagingItemReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step" >
    <property name="dataSource" ref="kvrDatasource" />
    <property name="queryProvider">
        <bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean" >
            <property name="dataSource" ref="kvrDatasource" />
            <property name="selectClause" value="${AR_INPUT_PAGING_ITEM_READER_SELECT}" />
            <property name="fromClause" value="${AR_INPUT_PAGING_ITEM_READER_FROM}" />          <property name="whereClause" value="${AR_INPUT_PAGING_ITEM_READER_WHERE}" />
            <property name="sortKey" value="${AR_INPUT_PAGING_ITEM_READER_SORT}" />
        </bean>
    </property>
    <!-- Inject via the ExecutionContext in rangePartitioner -->
    <property name="parameterValues">
        <map>
            <entry key="fromId" value="#{stepExecutionContext[fromId]}" />
            <entry key="toId" value="#{stepExecutionContext[toId]}" />
        </map>
    </property>
    <property name="pageSize" value="${AR_PAGE_SIZE}" />
    <property name="rowMapper" ref="arOutInRowMapper" />
</bean>
<bean id="arOutputWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter"
        scope="step">
    <property name="dataSource" ref="kvrDatasource" />
    <property name="sql" value="${SQL_AR_OUTPUT_INSERT}"/>
    <property name="itemSqlParameterSourceProvider">
        <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
</bean>

也许有人知道如何用现代的Spring批/Spring Boot来转换这个
PS:不要使用太多线程,否则Spring批处理会浪费很多时间来填充它自己的表。你必须做一些基准测试来理解正确的配置
我还建议不要对数百万行使用jpa/hib,在我的例子中,我使用的是jdbcTemplate

EDIT有关注解配置,请参见此问题

使用分区程序进行配置的示例

@Configuration
@RequiredArgsConstructor
public class JobConfig {

    private static final Logger log = LoggerFactory.getLogger(JobConfig.class);

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value(value = "classpath:employees.csv")
    private Resource resource;

    @Bean("MyJob1")
    public Job createJob(@Qualifier("MyStep1") Step stepMaster) {
        return jobBuilderFactory.get("MyJob1")
            .incrementer(new RunIdIncrementer())
            .start(stepMaster)
            .build();
    }

    @Bean("MyStep1")
    public Step step(PartitionHandler partitionHandler, Partitioner partitioner) {
        return stepBuilderFactory.get("MyStep1")
            .partitioner("slaveStep", partitioner)
            .partitionHandler(partitionHandler)
            .build();
    }

    @Bean("slaveStep")
    public Step slaveStep(FlatFileItemReader<Employee> reader) {
        return stepBuilderFactory.get("slaveStep")
            .<Employee, Employee>chunk(1)
            .reader(reader)
            .processor((ItemProcessor<Employee, Employee>) employee -> {
                System.out.printf("Processed item %s%n", employee.getId());
                return employee;
            })
            .writer(list -> {
                for (Employee item : list) {
                    System.out.println(item);
                }
            })
            .build();
    }

    @Bean
    public Partitioner partitioner() {
        return gridSize -> {
            Map<String, ExecutionContext> result = new HashMap<>();
            int lines = 0;
            try(BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream()))) {
                while (reader.readLine() != null) lines++;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            int range = lines / gridSize;
            int remains = lines % gridSize;
            int fromLine = 0;
            int toLine = range;
            for (int i = 1; i <= gridSize; i++) {
                if(i == gridSize) {
                    toLine += remains;
                }
                ExecutionContext value = new ExecutionContext();
                value.putInt("fromLine", fromLine);
                value.putInt("toLine", toLine);
                fromLine = toLine;
                toLine += range;
                result.put("partition" + i, value);
            }
            return result;
        };
    }

    @StepScope
    @Bean
    public FlatFileItemReader<Employee> flatFileItemReader(@Value("#{stepExecutionContext['fromLine']}") int startLine, @Value("#{stepExecutionContext['toLine']}") int lastLine) {
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
        reader.setResource(resource);

        DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
        lineMapper.setFieldSetMapper(fieldSet -> {
            String[] values = fieldSet.getValues();
            return Employee.builder()
                    .id(Integer.parseInt(values[0]))
                    .firstName(values[1])
                    .build();
        });

        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
        reader.setLineMapper(lineMapper);
        reader.setCurrentItemCount(startLine);
        reader.setMaxItemCount(lastLine);

        return reader;
    }

    @Bean
    public PartitionHandler partitionHandler(@Qualifier("slaveStep") Step step, TaskExecutor taskExecutor) {
        TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();

        taskExecutorPartitionHandler.setTaskExecutor(taskExecutor);
        taskExecutorPartitionHandler.setStep(step);
        taskExecutorPartitionHandler.setGridSize(5);

        return taskExecutorPartitionHandler;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(5);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

}
swvgeqrz

swvgeqrz2#

我们有一个类似的使用案例,我必须从基于特定标准阅读数百万条记录开始,作为来自休息端点的输入,并使用20-30个线程并行处理这些记录,以满足极端的期限。但随后的挑战是,对数据库进行同样复杂的查询,然后进行分区,以在生成的线程之间共享。

  • 更好的解决方案:我们通过阅读一次数据,然后在内部对其进行分区并将其传递给启动的线程来解决此问题。

一个典型的批处理过程的目标是-〉读取、进行一些http调用/操作数据,并将其写入响应日志表。
Spring批提供了跟踪已处理记录的功能,以便可以启动重新启动,以挑选剩余批次进行处理。另一种方法是在主表中设置一个标志,将记录标记为已处理,以便在重新启动期间不必挑选。
面临的多重挑战包括:

  • 支持查询读取器中的连接
  • 数据分区。
  • 再次处理同一记录
  • 进入多处理-〉

假设您有10000条记录,需要并行处理5条记录。
可以实施多种创造性解决方案,但最常用的两种适合所有使用情形的解决方案是

  • 对数据进行分区。
  • 如果是数值型,则根据索引数据的值的模对数据进行分区。

考虑到机器所能提供的内存,可以选择合适的线程数。例如,每个线程可以处理2000条记录。
分割是分割范围的行程,并允许每个步骤执行行程在自己的执行绪中选取并执行。对于上述步骤,我们需要分割这些范围,并在查询执行时传递它,让它撷取范围的记录,并在个别的执行绪中继续行程。

  • 执行绪0:1-2000
  • 螺纹1:2001-4000
  • 螺纹2:4001-6000
  • 螺纹3:6001-8000
  • 螺纹4:8001-10000

另一种划分逻辑是将线程0分配到4,并将查询基分配为该数字的模。但这种方法的一个缺点是,一个特定的范围将接收比其他范围更多的负载,而前一种方法将确保每个人都得到公平的份额。
拆分的数据将被传递到单独的线程,该线程将开始处理该数据,并在步骤中提到的提交间隔(块大小)写入数据。
编码:

  • 读取器
@Bean
@StepScope
public JdbcPagingItemReader<YourDataType> dataReaders(
    @Value("#{jobParameters[param1]}") final String param1, 
    @Value("#{stepExecutionContext['modulo']}") Long modulo) throws Exception {
    logger.info("Thread started reading for modulo index : " + modulo);
    JdbcPagingItemReader<YourDataType> reader = new JdbcPagingItemReader <> ();
    reader.setDataSource(getDataSource());
    reader.setRowMapper(new YourDataTypeRowMapper());
    reader.setQueryProvider(queryProvider(param1, modulo));
    return reader;

public OraclePagingQueryProvider queryProvider(String param1, Long modulo) throws Exception {
    OraclePagingQueryProvider provider = new OraclePagingQueryProvider();
    provider.setSelectclause("your elements to query");
    provider.setFromClause("your tables/ joined tables");
    provider.setWhereclause("where clauses AND MOD (TO_NUMBER(yourkey) = " + modulo);
    Map<String,Order> sortkeys = new HashMap<>();
    sortKeys.put("yoursortkey", Order.ASCENDING);
    provider.setSortKeys(sortKeys);
    return provider;
}

示例数据读取器-〉param 1是用户想要输入的任何参数。modulo是一个步骤执行参数-从Partitioner对象传递。
如果用于模5,则Paritator对象将具有模0| 1| 2| 3| 4,这将产生5个线程,这些线程将与读取器交互并为划分的集合获取数据。

  • 作者
@Bean
public JdbcbatchItemWriter<YourDataType> dataWriter() throws Exception {
    logger.info("Initializing data writer"); 
    JdbcBatchItemWriter<YourDataType> databaseItemWriter = new JdbcBatchItemWriter<>(); 
    databaseItemWriter.setDataSource(injectyourdatasourcehere); 
    databaseItemWriter.setsql(INSERT_QUERY_HERE); 
    ItemPreparedStatementsetter<RespData> ps = new YourResponsePreparedStatement(); 
    databaseItemWriter.setItemPreparedStatementsetter(ps); 
    return databaseItemWriter;
}

public class Your ResponsePreparedStatement implements ItemPreparedStatementSetter<RespData> {
    public void setValues (RespData respData, PreparedStatement preparedStatement)throws SQLException {
        preparedStatement.setString(1, respData.getYourData());
    }
}

响应写入器,用于记录对任何表的响应,以记录处理的数据,用于分析或业务报告。

  • 处理器
@Bean 
public ItemProcessor<YourDataType,RespData> processor() {

   return new YOURProcessor();

}

将写入数据操作的核心逻辑的处理器。返回的响应属于数据写入器所需的类型。

  • 如果您希望跳过Spring批次表自动创建,覆盖批次配置将解决此问题。
@Configuration
    @EnableAutoConfiguration
    @EnableBatchProcessing
    public class BatchConfiguration extends DefaultBatchConfigurer {

    @Override
    public void setDataSource(DataSource dataSource) {}
    }

否则可能遇到这样的异常:
(java.lang.Thread.run:829)[?:?]导致原因:异常错误:准备好的语句回调; SQL [将值(?,?,?,?)插入批作业示例(作业示例标识,作业名称,作业关键字,版本)]; ORA-08177:无法序列化此事务处理访问;嵌套的异常是java.sql.SQLException:ORA-08177:无法序列化此事务处理访问
列范围分区程序可以创建为:

@Component 
public class ColumnRangePartitioner implements Partitioner { 
Map<String,ExecutionContext> result = new HashMap(); 

@Override 
public Map<String,ExecutionContext> partition(int gridsize) {
    Map<String,ExecutionContext> result = new HashMap<>(); 
     int start = 0; 
     while (start < gridSize) { 
     ExecutionContext value = new ExecutionContext(); 
     result.put("partition : " + start, value);
     value.putInt("modulo", start); 
     start += 1;
   }
  return result;
 }
}
  • 设置作业和步骤

我们的工作将集中于执行步骤1-它将根据提供的分区器(这里是columnrange分区器)产生线程来处理该步骤。
网格大小是指并行线程数(使用计算模数);
每一个processStep步骤都是一系列阅读指定模数的特定线程的数据、处理数据然后写入数据的过程。

@Bean
public ColumnRangePartitioner getParitioner () throws Exception {
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
    return columnRangePartitioner;
}

@Bean
public Step step1(@Qualifier("processStep") Step processStep, 
                StepBuilderFactory stepBuilderFactory) throws Exception {
return stepBuilderFactory.get("step1")
                         .listener(jobCompletionNotifier)
                         .partitioner(processStep.getName(),getParitioner())
                         .step(processStep)
                         .gridSize(parallelThreads)
                         .taskExecutor(taskExecutor())
                         .build();
}

@Bean
public Step processStep(
@Qualifier("DataReader") ItemReader<ReadType>  reader, 
@Qualifier("LogWRITE") ItemWriter<WriterType> writer, 
StepBuilderFactory stepBuilderFactory) throws Exception {
return stepBuilderFactory.get("processStep")
                        .<ReadType,WriterType> chunk(1)
                        .reader(reader)
                        .processor(processor())
                        .writer (writer)
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(exceptionLimit)
                        .build();

}

@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
    return asyncTaskExecutor;
}

@Bean
public Job our JOB (@Qualifier("step1") Step step1, JobBuilderFactory jobBuilderFactory) throws Exception {
return jobBuilderFactory.get("ourjob")
                        .start(step1)
                        .incrementer(new RunIdIncrementer())
                        .preventRestart()
                        .build();   
}

这可能是一个常见的Spring批处理解决方案,但也适用于涉及常用的基于SQL DB/ java的解决方案的每个迁移需求。

  • 我们确实向应用程序添加了自定义设置

避免再次执行联接查询,然后进行过滤。复杂的联接可能会影响数据库性能。因此,一个更好的解决方案是提取数据一次,然后在内部拆分它。应用程序使用的内存会很大,哈希Map将填充您的查询将提取的所有数据,但java能够处理这些。可以将获取的数据传递给ListItemReader,以便并行处理该特定线程的数据列表。

对于处理并行请求(不是线程,而是对该应用程序的并行API调用),可以进行修改,以便只处理一次某个查询,使用信号量保持对该查询的锁定,以便其他线程等待该查询。一旦释放锁定,这些等待线程将发现数据存在,并且数据库将不会再次被查询。
以上实现的代码对于这个博客范围来说可能会很复杂。请随意询问您的应用程序是否需要任何用例。
我很乐意解决任何关于同样的问题。请随时联系我(Akshay),地址是gmail.com,或联系我的同事(萨加尔),地址是sagarnagdev61@gmail.com

相关问题