我的设想是: JdbcPagingItemReader
正在从oracledb读取并返回对象,比如“employee”。那么这就是 Employee
'对象被传递给处理器以再次调用db,从多个表中提取更多信息并返回' AggregatedEmployee
'对象(它实际上扩展了employee)。我正在使用 KafkaItemWriter
将处理过的对象写入Kafka而不是写入 AggregatedEmployee
,作者试图自己写“雇员”。
@马哈茂德·本·哈辛:我看到了你对 Spring 批次的很多建议。请分享你的想法。
处理器接口代码:
public interface PageProcessor<T> {
<R extends Employee> R process(T page);
}
步骤bean代码:
@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5).
reader(reader).
writer(writer).build();
}
processorinterface代码的实现类:
public class EmployeeProcessor implements PageProcessor<Employee> {
private NamedParameterJdbcTemplate jdbcTemplate;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
@SuppressWarnings("unchecked")
@Override
public <R extends Employee> R process(Employee page) {
... implementation goes here
}
Kafkaitem作家bean:
@Bean
KafkaItemWriter<String,AggregatedEmployee> writer(){
return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
.kafkaTemplate(aggregatedEmployeekafkaTemplate)
.itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
.build();
}
编辑以显示处理器:
public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
private PageProcessor<T> pageProcessor;
public void setPageProcessor(PageProcessor<T> pageProcessor) {
this.pageProcessor = pageProcessor;
}
当读卡器bean被创建时,处理器对象也被创建并通过上面显示的setter被设置到读卡器中,employeeprocessor中编写的处理器逻辑也被执行。
错误:
java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)
1条答案
按热度按时间nuypyhwy1#
您没有在步骤上设置处理器:
您需要设置进行类型转换的处理器
Employee
->AggregatedEmployee
在你的台阶上。