被动springboot+kafka http get请求挂起

iszxjhcz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(236)

我正在尝试一个示例springboot与kafka和angular的React式集成,如本教程中所述https://raymondhlee.wordpress.com/2019/11/23/a-reactive-stack-with-spring-boot-kafka-and-angular/
但是get调用的响应 http://localhost:8181/weather 从不从下面的此方法返回。而且状态还没有确定。
ctrl键:

private Flux<WeatherInfoEvent> bridge;

    public WeatherInfoController() {
         // (3) Broadcast to several subscribers
         this.bridge = createBridge().publish().autoConnect().cache(10).log();
    }

    // (1) Spring MVC annotation
    @GetMapping(value = "/weather", produces = "text/event-stream;charset=UTF-8")
    public Flux<WeatherInfoEvent> getWeatherInfo() {
        System.out.println(" @$@$#@$ Hey i was called....");
        System.out.println(" @#$#@$# bridge.toString()"+bridge.getPrefetch() + bridge.toString());
         return bridge;
    }

    private Flux<WeatherInfoEvent> createBridge() {
         Flux<WeatherInfoEvent> bridge = Flux.create(sink -> { // (2)
              processor.register(new WeatherInfoEventListener() {

                  @Override
                  public void processComplete() {
                      sink.complete();
                  }

                  @Override
                  public void onData(WeatherInfoEvent data) {
                      System.out.println(" data ="+data);
                      sink.next(data);
                  }
               });
         });
         return bridge;
    }

服务:

public ListenableFuture<SendResult<String, WeatherInfoEvent>> sendMessage(String topic, WeatherInfoEvent message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        return this.kafkaTemplate.send(topic, message);
    }

    @Scheduled(fixedDelay = 5000)
    public void getWeatherInfoJob() throws IOException {
        logger.info("generate fake weather event");
        // fake event
        WeatherInfoEvent event = new WeatherInfoEvent(RandomUtils.nextLong(0, 100), RandomUtils.nextInt(16, 30));
         ListenableFuture<SendResult<String, WeatherInfoEvent>> future = sendMessage("weather1", event);
         try {
            if(future.get()!=null) {
                 future.cancel(true);
             }
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (ExecutionException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
         try {
            System.out.println(" future :"+future.get().getProducerRecord().toString());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

知道我哪里会出错吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题