我有一个spring-boot应用程序运行良好,直到我把kafka消费者和生产者包括在我的应用程序中。运行完全没有问题的代码有一个restcontroller,如下所示:
@RestController
public class OrderResource {
//Get orderheaderkeys for a particular date
//OrderLine
@GetMapping("/orderForDate/{forDate}")
public List<String> findOrderHeaderKeys(@PathVariable String forDate) {
//Some business logic
return keys;
}
}
这个休息终点给出了期望的React。现在,我包括Kafka的制作人和消费者
@Component
public class KafkaProducerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaProducerClient.class);
private KafkaProducer<String, String> producer;
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<String, String>(properties);
}
public void sendMessageAsync(String topic, String key, String jsonString) {
logger.info("Sending message async to kafka topic with key = {}", key);
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, jsonString);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
final long timeTaken = System.currentTimeMillis() - startTime;
if (recordMetadata != null) {
logger.info("Producer sent record(key={}, value={}). " +
"Topic={}, Partition={}, Offset={}, timeTaken={}",
record.key(), record.value(), topic, recordMetadata.partition(),
recordMetadata.offset(), String.valueOf(timeTaken));
}
if (exception != null) {
logger.error("Exception occurred while posting message", exception.getMessage());
return;
}
}
});
logger.info("Message sent to kafka topic with key = {}", key);
}
public void sendMessageSync(String topic, String key, String jsonString) {
try {
logger.info("Sending message sync to kafka topic={} with key={}", topic, key);
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, jsonString);
Future<RecordMetadata> future = producer.send(record);
producer.flush();
RecordMetadata recordMetadata = future.get();
final long timeTaken = System.currentTimeMillis() - startTime;
if (recordMetadata != null) {
logger.info(
"Producer sent message by sendMessageSync. record={}. timeTaken={}",
recordMetadata,
String.valueOf(timeTaken));
}
} catch (Exception ex) {
logger.error("Exception occured....", ex);
}
}
@PreDestroy
private void shutdown(){
producer.close();
}
}
@Component
public class KafkaConsumerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerClient.class);
private KafkaConsumer<String, String> consumer;
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.topic}")
private String topic;
@Value("${zookeeper.groupId}")
private String groupId;
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
try {
logger.info("Key: " + record.key() + ", Value: " + record.value());
//orderResource.saveOrderToSecondaryStore(record.value().toString());
}catch (Exception e){
logger.error("Exception while processing Kafka message", e);
}
}
}
}
}
包括这些消费者和生产者之后,我的应用程序不会启动。我没有看到下面几行在应用程序正常运行时显示出来。
2019-12-12 15:01:12.090 info 38376---[restartedmain]o.s.b.w.embedded.tomcat.tomcatwebserver:tomcat在端口8080(http)上启动,上下文路径为“”2019-12-12 15:01:12.093 info 38376---[restartedmain]c.w.c.o.p.myspringapplication:在15.187秒内启动myspringapplication(jvm运行15.617)
2条答案
按热度按时间bnlyeluc1#
我通过将消费者轮询(while循环)移到kafkaconsumerclient的init方法之外来解决这个问题
ny6fqffe2#
在tomcat服务器上部署spring引导应用程序:更新pom.xml:
更新主应用程序类:
并更新Kafka的依赖关系和配置,更多详情请参见以下链接-https://www.confluent.io/blog/apache-kafka-spring-boot-application/