我想把Kafka的数据写入mysql数据库。我已经实现了以下代码。
kafkaConsumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> record = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record1 : record) {
if (record1.value().length() > 0) {
System.out.println(record1.value());
String value = record1.value();
String[] array = value.split(" ");
String sql = String.format("insert into data(timestamp, LogLevel,CityName,Detail) values ('%s', '%s','%s','%s')", array[0], array[1], array[2], array[3]);
mysqlConnector.statement.executeUpdate(sql);
}
}
}
1条答案
按热度按时间h79rfbju1#
也许重新发明轮子是不值得的。最好的选择是使用著名且经过测试的工具:kafka connect with confluent jdbc sink connector
https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html