kafka快速入门

x33g5p2x  于2022-06-30 转载在 Kafka  
字(2.7k)|赞(0)|评价(0)|浏览(772)

kafka快速入门

一、docker安装kafka集群

# docker直接拉取kafka和zookeeper的镜像
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper 
# 首先需要启动zookeeper,如果不先启动,启动kafka没有地方注册消息
docker run -it --name zookeeper --restart=always -p 12181:2181 -d wurstmeister/zookeeper:latest
# 启动kafka容器,注意需要启动三台,注意端口的映射,都是映射到9092
# 第一台
docker run -dit --name kafka01 --restart=always -p 19092:9092  -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第二台
docker run -dit --name kafka02 --restart=always -p 19093:9092  -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第三台
docker run -dit --name kafka03 --restart=always -p 19094:9092  -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

# 可视化工具
docker run -dit -p --name kafdrop 9000:9000 -e JVM_OPTS="-Xms32M -Xmx64M" -e KAFKA_BROKERCONNECT=1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 -e SERVER_SERVLET_CONTEXTPATH="/" obsidiandynamics/kafdrop

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=192.168.100.31:12181 配置zookeeper管理kafka的路径192.168.100.31:12181

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.31:19092 把kafka的地址端口注册给zookeeper

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

上面端口的映射注意都是映射到Kafka的9092端口上!否则将不能够连接!

二、命令行操作

查看正在运行的容器:docker ps

进入容器:docker exec -it kafka01 /bin/bash

用命令行对topic进行curd

进如kafka安装文件夹 cd /opt/kafka/bin
查看所有topic

kafka-topics.sh --list --zookeeper 1.14.252.45:12181

创建topic

kafka-topics.sh --zookeeper 1.14.252.45:12181 --create --topic first --replication-factor 1 --partitions 3

查询topic详情

kafka-topics.sh --zookeeper 1.14.252.45:12181 --describe --topic first

删除topic

kafka-topics.sh --zookeeper 1.14.252.45:12181 --delete --topic first

发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

kafka-console-producer.sh --broker-list 1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 --topic first

接受消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输 出, 默认是消费最新的消息 。使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息

kafka-console-consumer.sh --bootstrap-server 1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 --topic first --from-beginning

查看消费者和信息

# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 1.14.252.45:19092 --list
# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 1.14.252.45:19092 --describe --group testGroup
  • Currennt-offset: 当前消费组的已消费偏移量
  • Log-end-offset: 主题对应分区消息的结束偏移量(HW)
  • Lag: 当前消费组未消费的消息数

相关文章