spring-kafka流未处理事件

kzipqqlq  于 2021-07-13  发布在  Java
关注(0)|答案(0)|浏览(313)

我有以下配置。
发布服务器服务:

public class EventConfig {

    @Bean
    public Sinks.Many<BasicEvent> orderSink() {
      return Sinks.many().unicast().onBackpressureBuffer();
    }

    @Bean
    public Supplier<Flux<BasicEvent>> send(Sinks.Many<BasicEvent> sink) {
      return sink::asFlux;
    }
  }

public class BasicEventPublisher {

  private final Sinks.Many<BasicEvent> basicSink;
  private static final Random random = new Random();

  @Scheduled(fixedDelay = 10000)
  public void publish() {
    int i = random.nextInt();
    basicSink.tryEmitNext(new BasicEvent("TEST", i));
    log.info("Event sent:" + String.valueOf(i));
  }
}

侦听器服务:

public class EventHandler {

    @Bean
    public Consumer<Message<BasicEvent>> receive() {
      return payload -> {
        //Acknowledgment acknowledgment = payload.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        log.info(String.valueOf(payload.getPayload().getId()));
        //acknowledgment.acknowledge();
      };
    }

}

对于侦听器服务:

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        bindings:
          receive-in-0:
            consumer:
              autoCommitOffset: false
        binder:
          replicationFactor: 1
      function:
        definition: receive;
      bindings:
        receive-in-0:
          destination: basic-event

对于发布服务器服务:

server:
  port: 8080
spring:
  cloud:
    stream:
      kafka:
        bindings:
          send-out-0:
            consumer:
              autoCommitOffset: false
        binder:
          replicationFactor: 1
      function:
        definition: send;
      bindings:
        send-out-0:
          destination: basic-event
        receive-in-0:
          destination: basic-event

当我关闭侦听器时,事件仍然会被发布。我期望侦听器服务器在启动时获取这些未处理的事件。但它不起作用。即使侦听器不确认这些事件,也无法获取未处理的事件。
你能帮我解决这个问题吗。
谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题