nullpointerexception

vyu0f0g1  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(326)

我想把Kafka的数据写入mysql数据库。我已经实现了以下代码。

kafkaConsumer.subscribe(Arrays.asList(topicName));
    try {

        while (true) {
            ConsumerRecords<String, String> record = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record1 : record) {
                if (record1.value().length() > 0) {
                    System.out.println(record1.value());
                    String value = record1.value();
                    String[] array = value.split(" ");
                    String sql = String.format("insert into data(timestamp, LogLevel,CityName,Detail) values ('%s', '%s','%s','%s')", array[0], array[1], array[2], array[3]);
                    mysqlConnector.statement.executeUpdate(sql);

                }
            }
        }
h79rfbju

h79rfbju1#

也许重新发明轮子是不值得的。最好的选择是使用著名且经过测试的工具:kafka connect with confluent jdbc sink connector
https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html

相关问题