[hadoop@hadoop101 app]$ tar -zxvf kafka_2.11-2.3.0.tgz
[hadoop@hadoop101 app]$ mv kafka_2.11-2.3.0 kafka
[hadoop@hadoop101 kafka]$ mkdir logs
# kafka运行日志存放的路径
log.dirs=/home/hadoop/app/kafka/logs
# 配置连接 Zookeeper (集群)地址用,分隔
zookeeper.connect=hadoop101:2181
# 删除 topic 功能使能
delete.topic.enable=true
```
4. 配置环境变量
```sh
[hadoop@hadoop101 app]$ sudo vi /etc/profile
export KAFKA_HOME=/home/hadoop/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[hadoop@hadoop101 app]$ source /etc/profile
```
5. 启动kafka
```sh
# 启动
[hadoop@hadoop101 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
## 关闭
[hadoop@hadoop101 kafka]$ bin/kafka-server-stop.sh stop
```
6. kafka群起脚本
```
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh $i '/home/hadoop/app/kafka/bin/kafka-server-start.sh -daemon
/home/hadoop/app/kafka/config/server.properties'
done
```
## 命令行操作
1. 查看当前服务器中的所有 topic
```sh
#
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
```
2. 创建 topic
```sh
bin/kafka-topics.sh \
--zookeeper hadoop101:2181 \
--create --replication-factor 1 --partitions 1 \
--topic first
```
参数说明:
--topic 定义 topic 名
--replication-factor 定义副本数,必须小于等于主机数量
--partitions 定义分区数
3. 删除 topic
```sh
bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--delete --topic first
```
4. 生产消息
```sh
bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic first
```
5. 消费消息
在linux服务器另外一个端口打开
```sh
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092 --topic first
# --from-beginning: 会把topic中以往所有的数据都读取出来。
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092 --from-beginning --topic first
```
6. 查看某个 Topic 的详情
```sh
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic first
```
7. 修改分区数
```sh
bin/kafka-topics.sh --zookeeper hadoop101:2181 --alter --topic first --partitions 6
```
内容来源于网络,如有侵权,请联系作者删除!