我有以下配置。
发布服务器服务:
public class EventConfig {
@Bean
public Sinks.Many<BasicEvent> orderSink() {
return Sinks.many().unicast().onBackpressureBuffer();
}
@Bean
public Supplier<Flux<BasicEvent>> send(Sinks.Many<BasicEvent> sink) {
return sink::asFlux;
}
}
public class BasicEventPublisher {
private final Sinks.Many<BasicEvent> basicSink;
private static final Random random = new Random();
@Scheduled(fixedDelay = 10000)
public void publish() {
int i = random.nextInt();
basicSink.tryEmitNext(new BasicEvent("TEST", i));
log.info("Event sent:" + String.valueOf(i));
}
}
侦听器服务:
public class EventHandler {
@Bean
public Consumer<Message<BasicEvent>> receive() {
return payload -> {
//Acknowledgment acknowledgment = payload.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
log.info(String.valueOf(payload.getPayload().getId()));
//acknowledgment.acknowledge();
};
}
}
对于侦听器服务:
server:
port: 8081
spring:
cloud:
stream:
kafka:
bindings:
receive-in-0:
consumer:
autoCommitOffset: false
binder:
replicationFactor: 1
function:
definition: receive;
bindings:
receive-in-0:
destination: basic-event
对于发布服务器服务:
server:
port: 8080
spring:
cloud:
stream:
kafka:
bindings:
send-out-0:
consumer:
autoCommitOffset: false
binder:
replicationFactor: 1
function:
definition: send;
bindings:
send-out-0:
destination: basic-event
receive-in-0:
destination: basic-event
当我关闭侦听器时,事件仍然会被发布。我期望侦听器服务器在启动时获取这些未处理的事件。但它不起作用。即使侦听器不确认这些事件,也无法获取未处理的事件。
你能帮我解决这个问题吗。
谢谢您。
暂无答案!
目前还没有任何答案,快来回答吧!