spring批处理:一个读取器、复合处理器(两个具有不同实体的类)和两个kafkaitemwriter

72qzrwbm  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(348)

ItemReader 正在从db2读取数据,并给java对象 ClaimDto . 现在 ClaimProcessor 接受…的对象 ClaimDto 然后回来 CompositeClaimRecord 对象,包括 claimRecord1 以及 claimRecord2 它将被发送到两个不同的Kafka主题。如何写作 claimRecord1 以及 claimRecord2 分别到主题1和主题2。

pexxcrt2

pexxcrt21#

只是写一个习惯 ItemWriter 确实如此。

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    for (CompositeClaimRecord record : items) {
       writer1.write(Collections.singletonList(record.claimRecord1));
       writer2.write(Collections.singletonList(record.claimRecord2));

    }
  }
}

或者不是一次写一条记录,而是将一个列表转换成两个列表并传递给另一个列表。但这样的话,错误处理可能有点困难\

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
    List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());

    writer1.write(record1List);
    writer2.write(record2List);

  }
}
llew8vvj

llew8vvj2#

可以将classifiercompositeitemwriter与两个 KafkaItemWriter s作为代表(每个主题一个)。
这个 Classifier 会根据它们的类型来分类( claimRecord1 或者 claimRecord2 )并将它们传送给相应的Kafka条目编写器( topic1 或者 topic2 ).

相关问题