kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。
(1)zookeeper需要java环境
# yum -y install java-1.8.0
解压到指定目录
tar -xzf kafka_2.13-2.8.0.tgz -C /opt/
(2)这里kafka下载包已经包括zookeeper服务,所以只需修改配置文件,启动即可。
如果需要下载指定zookeeper版本;可以单独去zookeeper官网http://mirrors.shu.edu.cn/apache/zookeeper/下载指定版本。
[root@~]
# cd /data/kafka_2.13-2.8.0/
[root@ kafka_2.11-2.1.0]
# grep "^[^#]" config/zookeeper.properties
dataDir=
/tmp/zookeeper
#数据存储目录
clientPort=2181
#zookeeper端口
maxClientCnxns=0
注:可自行添加修改zookeeper配置
3.3 配置kafka
(1)修改配置文件
[root@ kafka_2.13-2.8.0]
# grep "^[^#]" config/server.properties
broker.
id
=0
listeners=PLAINTEXT:
//localhost
:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.
dirs
=
/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.
dir
=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
注:可根据自己需求修改配置文件
- broker.id:唯一标识ID
- listeners=PLAINTEXT://localhost:9092:kafka服务监听地址和端口
- log.dirs:日志存储目录
- zookeeper.connect:指定zookeeper服务
————————————————————————
2、启动zk(cd /data/kafka_2.13-2.8.0/bin)
./zookeeper-server-start.sh -daemon /data/kafka_2.13-2.8.0/config/zookeeper.properties
3、修改Kafka配置,并启动
./kafka-server-start.sh -daemon /data/kafka_2.13-2.8.0/config/server.properties
4、创建topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
5、使用生产者发送消息,每行是一条独立的消息
and go ….
# 创建topic
kafka-topics.sh –zookeeper server01:2181 –create –topic test –replication-factor 1 –partitions 3
# 删除topic (如果kafka配置delete.topic.enable=true,那么可以直接删除topic,执行删除topic命令)
kafka-topics.sh –zookeeper server01:2181 –delete –topic test
# 修改topic的分区,注意:分区数量只能增加,不能减少
kafka-topics.sh –zookeeper server01:2181 –alter –topic test –partitions 5
# 查看所有topic
kafka-topics.sh –zookeeper server01:2181 –list
# 查看所有topic的详细信息
kafka-topics.sh –zookeeper server01:2181 –describe
# 查看指定topic的详细信息
kafka-topics.sh –zookeeper server01:2181 –describe –topic test
# describe命令还提供一些参数,用于过滤输出结果,如:
# –topic-with-overrides:可以找出所有包含覆盖配置的主题,它只会列出包含与集群不一样配置的主题
kafka-topics.sh –zookeeper server01:2181 –describe –topics-with-overrides
# describe有两个参数用于找出有问题的分区
# –unavailable-partitions:列出所有没有首领的分区,这些分区已经处于离线状态,对于生产者和消费者来说是不可用的
# –under-replicated-partitions:列出所有包含不同步副本的分区。
kafka-topics.sh –zookeeper server01:2181 –describe –unavailable-partitions
kafka-topics.sh –zookeeper server01:2181 –describe –under-replicated-partitions
生产者shell命令
kafka-console-producer.sh –broker-list server01:9092 –topic test
消费者shell命令(kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning新版)
kafka-console-consumer.sh –zookeeper server01:2181 –from-beginning –topic test
# 指定group消费组
kafka-console-consumer.sh –zookeeper server01:2181 –from-beginning –group test_group –topic test
# 列出test_group消费组的详细信息
kafka-consumer-groups.sh –zookeeper server01:2181 –describe –group test_group
test 0 8 8 0 –
test 1 8 8 0 –
test 2 8 8 0 –
# CURRENT-OFFSET: 当前消费者群组最近提交的offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的CURRENT-OFFSET与broker的LOG-END-OFFSET之间的差距
# 删除消费者群组
kafka-consumer-groups.sh –zookeeper server01:2181 –delete –group test_group
# 删除消费者群组中的topic
kafka-consumer-groups.sh –zookeeper server01:2181 –delete –group test_group –topic test