spring引导应用程序不以kafka和restcontroller启动

pzfprimi  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(416)

我有一个spring-boot应用程序运行良好,直到我把kafka消费者和生产者包括在我的应用程序中。运行完全没有问题的代码有一个restcontroller,如下所示:

@RestController
public class OrderResource {
    //Get orderheaderkeys for a particular date
    //OrderLine
    @GetMapping("/orderForDate/{forDate}")
    public List<String> findOrderHeaderKeys(@PathVariable String forDate) {
        //Some business logic
        return keys;
    }
}

这个休息终点给出了期望的React。现在,我包括Kafka的制作人和消费者

@Component
public class KafkaProducerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaProducerClient.class);
    private KafkaProducer<String, String> producer;

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<String, String>(properties);
    }

    public void sendMessageAsync(String topic, String key, String jsonString) {
        logger.info("Sending message async to kafka topic with key = {}", key);
        long startTime = System.currentTimeMillis();
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, jsonString);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                final long timeTaken = System.currentTimeMillis() - startTime;
                if (recordMetadata != null) {
                    logger.info("Producer sent record(key={}, value={}). " +
                                    "Topic={}, Partition={}, Offset={}, timeTaken={}",
                            record.key(), record.value(), topic, recordMetadata.partition(),
                            recordMetadata.offset(), String.valueOf(timeTaken));
                }
                if (exception != null) {
                    logger.error("Exception occurred while posting message", exception.getMessage());
                    return;
                }
            }
        });
        logger.info("Message sent to kafka topic with key = {}", key);
    }

    public void sendMessageSync(String topic, String key, String jsonString) {
        try {
            logger.info("Sending message sync to kafka topic={} with key={}", topic, key);
            long startTime = System.currentTimeMillis();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, jsonString);
            Future<RecordMetadata> future = producer.send(record);
            producer.flush();
            RecordMetadata recordMetadata = future.get();
            final long timeTaken = System.currentTimeMillis() - startTime;
            if (recordMetadata != null) {
                logger.info(
                        "Producer sent message by sendMessageSync. record={}. timeTaken={}",
                        recordMetadata,
                        String.valueOf(timeTaken));
            }
        } catch (Exception ex) {
            logger.error("Exception occured....", ex);
        }

    }

    @PreDestroy
    private void shutdown(){
        producer.close();
    }
}
@Component
public class KafkaConsumerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerClient.class);
    private KafkaConsumer<String, String> consumer;

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @Value("${kafka.topic}")
    private String topic;

    @Value("${zookeeper.groupId}")
    private String groupId;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                try {
                    logger.info("Key: " + record.key() + ", Value: " + record.value());
                    //orderResource.saveOrderToSecondaryStore(record.value().toString());
                }catch (Exception e){
                    logger.error("Exception while processing Kafka message", e);
                }
            }
        }
    }
}

包括这些消费者和生产者之后,我的应用程序不会启动。我没有看到下面几行在应用程序正常运行时显示出来。
2019-12-12 15:01:12.090 info 38376---[restartedmain]o.s.b.w.embedded.tomcat.tomcatwebserver:tomcat在端口8080(http)上启动,上下文路径为“”2019-12-12 15:01:12.093 info 38376---[restartedmain]c.w.c.o.p.myspringapplication:在15.187秒内启动myspringapplication(jvm运行15.617)

bnlyeluc

bnlyeluc1#

我通过将消费者轮询(while循环)移到kafkaconsumerclient的init方法之外来解决这个问题

@PostConstruct
public void init() {
}
ny6fqffe

ny6fqffe2#

在tomcat服务器上部署spring引导应用程序:更新pom.xml:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>

更新主应用程序类:

@SpringBootApplication
public class Application extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(Application.class);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

并更新Kafka的依赖关系和配置,更多详情请参见以下链接-https://www.confluent.io/blog/apache-kafka-spring-boot-application/

相关问题