kafka 安装入门

kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。

(1)zookeeper需要java环境

# yum -y install java-1.8.0

解压到指定目录

tar -xzf kafka_2.13-2.8.0.tgz -C /opt/

(2)这里kafka下载包已经包括zookeeper服务,所以只需修改配置文件,启动即可。

如果需要下载指定zookeeper版本;可以单独去zookeeper官网http://mirrors.shu.edu.cn/apache/zookeeper/下载指定版本。

[root@~]# cd /data/kafka_2.13-2.8.0/
[root@ kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties
dataDir=/tmp/zookeeper   #数据存储目录
clientPort=2181   #zookeeper端口
maxClientCnxns=0

注:可自行添加修改zookeeper配置

3.3 配置kafka

(1)修改配置文件

[root@ kafka_2.13-2.8.0]# grep "^[^#]" config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

注:可根据自己需求修改配置文件

  •  broker.id:唯一标识ID
  •  listeners=PLAINTEXT://localhost:9092:kafka服务监听地址和端口
  •  log.dirs:日志存储目录
  •  zookeeper.connect:指定zookeeper服务

————————————————————————

2、启动zk(cd /data/kafka_2.13-2.8.0/bin)

./zookeeper-server-start.sh -daemon /data/kafka_2.13-2.8.0/config/zookeeper.properties

3、修改Kafka配置,并启动

./kafka-server-start.sh -daemon /data/kafka_2.13-2.8.0/config/server.properties

4、创建topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

5、使用生产者发送消息,每行是一条独立的消息

and  go ….

 

# 创建topic
kafka-topics.sh –zookeeper server01:2181 –create –topic test –replication-factor 1 –partitions 3
# 删除topic (如果kafka配置delete.topic.enable=true,那么可以直接删除topic,执行删除topic命令)
kafka-topics.sh –zookeeper server01:2181 –delete –topic test
# 修改topic的分区,注意:分区数量只能增加,不能减少
kafka-topics.sh –zookeeper server01:2181 –alter –topic test –partitions 5
# 查看所有topic
kafka-topics.sh –zookeeper server01:2181 –list
# 查看所有topic的详细信息
kafka-topics.sh –zookeeper server01:2181 –describe
# 查看指定topic的详细信息
kafka-topics.sh –zookeeper server01:2181 –describe –topic test
# describe命令还提供一些参数,用于过滤输出结果,如:
# –topic-with-overrides:可以找出所有包含覆盖配置的主题,它只会列出包含与集群不一样配置的主题
kafka-topics.sh –zookeeper server01:2181 –describe –topics-with-overrides
# describe有两个参数用于找出有问题的分区
# –unavailable-partitions:列出所有没有首领的分区,这些分区已经处于离线状态,对于生产者和消费者来说是不可用的
# –under-replicated-partitions:列出所有包含不同步副本的分区。
kafka-topics.sh –zookeeper server01:2181 –describe –unavailable-partitions
kafka-topics.sh –zookeeper server01:2181 –describe –under-replicated-partitions

生产者shell命令
kafka-console-producer.sh –broker-list server01:9092 –topic test
消费者shell命令(kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning新版)
kafka-console-consumer.sh –zookeeper server01:2181 –from-beginning –topic test
# 指定group消费组
kafka-console-consumer.sh –zookeeper server01:2181 –from-beginning –group test_group –topic test

# 列出test_group消费组的详细信息
kafka-consumer-groups.sh –zookeeper  server01:2181    –describe        –group     test_group
test                    0              8                       8                  0               –
test                    1              8                    8                  0               –
test                    2              8                        8                  0               –
# CURRENT-OFFSET: 当前消费者群组最近提交的offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的CURRENT-OFFSET与broker的LOG-END-OFFSET之间的差距

# 删除消费者群组
kafka-consumer-groups.sh –zookeeper server01:2181 –delete –group test_group
# 删除消费者群组中的topic
kafka-consumer-groups.sh –zookeeper server01:2181 –delete –group test_group –topic test

 

发表评论