spring批处理kafka-kafka到数据库作业

xqnpmsa8  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(611)

我需要一个spring批处理itemreader来使用kafka消息,这些消息的结果需要进一步处理和写入。
下面是我实现的一个条目读取器:

public abstract class KafkaItemReader<T> implements ItemReader<List<T>> {
  public abstract KafkaConsumer<String, T> getKafkaConsumer();

  public abstract String getTopic();

  public abstract long getPollingTime();

  @Override
  public List<T> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    Iterator<ConsumerRecord<String, T>> iterator = getKafkaConsumer()
        .poll(Duration.ofMillis(getPollingTime()))
        .records(getTopic())
        .iterator();
    List<T> records = new ArrayList<>();
    while (iterator.hasNext()) {
      records.add(iterator.next().value());
    }
    return records;
  }
}

以下是spring批处理作业和步骤的bean:

@Bean
  public ItemWriter<List<DbEntity>> databaseWriter(DataSource dataSource) {
    //some item writer that needs to be implmented
    return null;
  }

  @Bean
  public Step kafkaToDatabaseStep(KafkaItemReader kafkaItemReader, //implementation of KafkaItemReader
                                  StepBuilderFactory stepBuilderFactory,
                                  DataSource dataSource) {

    return stepBuilderFactory
        .get("kafkaToDatabaseStep")
        .allowStartIfComplete(true)
        .<List<KafkaRecord>, List<DbEntity>>chunk(100)
        .reader(kafkaItemReader)
        .processor(itemProcessor()) //List<KafkaRecord> to List<DbEntity> converter
        .writer(databaseWriter(dataSource))
        .build();
  }

  @Bean
  public Job kafkaToDatabaseJob(
      @Qualifier("kafkaToDatabaseStep") Step step) {
    return jobBuilderFactory.get("kafkaToDatabaseJob")
        .incrementer(new RunIdIncrementer())
        .flow(step)
        .end()
        .build();
  }

在这里我不知道:
如何提交writer中读取消息的偏移量,因为我只想在完成记录处理后提交。
如何在我的场景中使用jdbccbatchitemwriter作为itemwriter。

nuypyhwy

nuypyhwy1#

即将发布的springbatchv4.2ga将为apachekafka主题提供读写数据支持。您已经可以在4.2.0.m2版本中尝试这一点了。
你也可以看看joshlong的spring tips一期。

相关问题