apache druid 与kafka整合使用

x33g5p2x  于2022-02-12 转载在 Druid  
字(5.0k)|赞(0)|评价(0)|浏览(403)

前言

在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用

本篇,我们结合一个实际的简单的应用场景,来说说apache druid如何在实际项目中进行使用

业务场景

如下所示,是一个很常见的数据分析的业务,通常来说,很多实时或准实时的数据(这里理解为外部数据源)需要通过kafka进行中转,即发送到kafka中,

apache druid提供了导入外部数据源的功能,可以接收来自kafka指定topic的数据,然后支持数据分析,将kafka的数据导入到apache druid之后,再通过程序(后台应用)进行数据读取,根据实际的业务需求读取从kafka中摄取的数据进行逻辑处理

最后,应用程序将数据处理之后,进行写库,或者作为大屏展示的数据进行输出

以此为基础,可以将这个过程应用到很多与之相关的场景中,比如源数据是来自大数据引擎的处理结果,或者是python程序爬虫得到的结果…

下面我们来对这个过程从操作到代码实现做一个完整的演示

前置准备

  • docker或者linux环境下搭建的zookeeper和kafka,并提前创建一个topic
  • 启动apache druid服务

做一下kafaka的数据测试,验证topic可以正常的收发消息

1、apache druid 控制台连接kafka

loada data 选择kafka

填写kafka的连接信息即可

然后一路next等待解析,解析完毕,通过顶部的query查看左侧是否出现下面的这个自定义的库名

上面的意思是,将kafka中某个topic中的数据解析到apache druid的库中,然后就可以通过apache druid对导入的数据进行管理和分析了

我们不妨使用sql查询一下,可以看到刚刚我们做测试的数据都展示出来了

2、编写程序,定时向kafka推送消息

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class KfkaTest {

    public static void main(String[] args) {
        AtomicLong atomicLong = new AtomicLong(1);
        Runnable runnable = new Runnable() {
            public void run() {
                //定时向kafka推送消息
                long l = atomicLong.incrementAndGet();
                pushMessage(l);
            }
        };
        ScheduledExecutorService service = Executors
                .newSingleThreadScheduledExecutor();
        // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
        service.scheduleAtFixedRate(runnable, 10, 1, TimeUnit.SECONDS);
    }

    public static void pushMessage(long num) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "IP:9092");
        properties.put("acks", "all");
        properties.put("retries", "3");
        properties.put("batch.size", "16384");
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        //key和value的序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //构造生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ObjectMapper objectMapper = new ObjectMapper();
        Map<String, Object> map = new HashMap<>();
        map.put("name", "gaoliang:" + num);
        map.put("age", 19);
        map.put("city", "深圳");
        String val = null;
        try {
            val = objectMapper.writeValueAsString(map);
            System.out.println(val);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        producer.send(new ProducerRecord<>("study1", "congge ", val));
        //关闭连接资源
        producer.close();
    }

}

3、通过程序读取apache druid 的数据

关于这一点,方式就很灵活了,是将读取到的数据做何种处理呢?那就要看业务的具体需求了,比如可以直接通过接口将读取到的最新数据返回给页面做展示呢?还是将数据进行逻辑处理之后入库呢?还是交给其他的服务进一步使用呢?通常来说,进行读取之后,写库和展示的应用场景比较多

下面来演示下,如何在程序中读取apache druid的数据,想必这个是大家关心的

直接在pom文件中添加如下依赖

<dependency>
            <groupId>org.apache.calcite.avatica</groupId>
            <artifactId>avatica-core</artifactId>
            <version>1.15.0</version>
        </dependency>

apache druid官方提供了jdbc的方式对数据进行查询的连接方式,下面直接上代码了

import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;

import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

public class DruidTest {

    private static final String DRUID_URL = "jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/";

    private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();

    /**
     * 打开连接
     * @param
     * @return
     * @throws SQLException
     */
    public static AvaticaConnection connection() throws SQLException {
        Properties properties = new Properties();
        AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
        threadLocal.set(connection);
        return connection;
    }

    /**
     * 关闭连接
     * @throws SQLException
     */
    public static void closeConnection() throws SQLException{
        System.out.println("关闭线程:"+threadLocal.get());
        AvaticaConnection conn = threadLocal.get();
        if(conn != null){
            conn.close();
            threadLocal.remove();
        }
    }

    /**
     * 根据sql查询结果
     * @param
     * @param sql
     * @return
     * @throws SQLException
     */
    public static ResultSet executeQuery (String sql) throws SQLException{
        AvaticaStatement statement = connection().createStatement();
        ResultSet resultSet = statement.executeQuery(sql);
        return resultSet;
    }

    public static void main(String[] args) {
        try {
            String sql = "SELECT * FROM \"study1\" limit 10";
            for (int i = 0; i < 5; i++) {
                ResultSet resultSet = executeQuery(sql);
                System.out.println("开始连接"+i + ";   连接线程:"+threadLocal.get());
                while(resultSet.next()){
                    String name = resultSet.getString("name");
                    System.out.println(name + "   ;   "+ name);
                }
                closeConnection();
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }

}

这时候不妨往kafka的study1的topic中再推一条消息

界面上查询,可以看到,数据已经过来了

再次运行程序,也能成功读取到

以上,通过程序结合控制台讲述了如何利用java程序连接kafka和apache druid的一种简单的业务场景,本篇的处理较为简单,未涉及到具体的功能层面的整合,主要是为使用apache druid进行进一步的深入使用做一个铺垫,希望对看到的同学有用!

相关文章