kafka集群安装部署、监控与参数优化

1、kafka 集群安装

kafka 的集群部署方式网上有很多,此处只列出主要安装步骤



#所有都部署到/data 目录下,可根据自己面要安装在不同目录
#本机 ip 为 192.168.1.100
cd /data

##1、先下载 zookeeper   https://zookeeper.apache.org/releases.html#download
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz

##2、部署 zookeeper(本次单机部署 3 个节点伪集群)
tar -xzvf apache-zookeeper-3.8.3-bin.tar.gz
mv apache-zookeeper-3.8.3-bin zookeeper1
cd zookeeper1
mkdir data
mv conf/zoo_simple.cfg conf/zoo.cfg

#编辑 zoo.cfg
  vim zoo.cfg
  tickTime=2000
  initLimit=10
  syncLimit=5
  dataDir=/data/zookeeper1/data
  clientPort=2181
  server.1=192.168.1.100:2881:3881
  server.2=192.168.1.100:2882:3882
  server.3=192.168.1.100:2883:3883
  

cp -R zookeeper1 zookeeper2
cp -R zookeeper1 zookeeper3

#同理更新 conf/zoo.cfg 的 clientPort 为 2182 和 2183

echo 1 > zookeeper1/data/myid
echo 2 > zookeeper2/data/myid
echo 3 > zookeeper3/data/myid

#启动 zookeeper
zookeeper1/bin/zkServer.sh start
zookeeper2/bin/zkServer.sh start
zookeeper3/bin/zkServer.sh start

#检查 zookeeper 是否启动成功
netstat -tnlp|grep 218

#对应 zookeeper 端口
zookeeper1  1  2181  2881 3881
zookeeper2  2  2182  2882 3882
zookeeper3  3  2183  2883 3883


##3、下载 kafka  https://kafka.apache.org/downloads
wget https://archive.apache.org/dist/kafka/2.0.1/kafka_2.11-2.0.1.tgz

##4、部署 kafka 集群
tar -xzvf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0 kafka1

#编辑 kafka 配置文件
vim kafka1/config/server.properties
  broker.id=1
  listeners=PLAINTEXT://:9091
  advertised.listeners=PLAINTEXT://192.168.1.100:9091
  num.network.threads=3
  num.io.threads=8
  socket.send.buffer.bytes=102400
  socket.receive.buffer.bytes=102400
  socket.request.max.bytes=104857600
  log.dirs=/tmp/kafka-logs1
  num.partitions=1
  num.recovery.threads.per.data.dir=1
  offsets.topic.replication.factor=1
  transaction.state.log.replication.factor=1
  transaction.state.log.min.isr=1
  log.retention.hours=168
  log.segment.bytes=1073741824
  log.retention.check.interval.ms=300000
  zookeeper.connect=192.168.1.100:2181,192.168.1.100:2182,192.168.1.100:2183
  zookeeper.connection.timeout.ms=6000
  group.initial.rebalance.delay.ms=0
  

#复制 kafka1
cp -R kafka1 kafka2
cp -R kafka1 kafka3

#同理修改 config/server.properties 中的 listeners 端口,borker.id=2 和 3, 还有 log.dirs

#启动Kafka 集群
cd ../kafka1
bin/kafka-server-start.sh -daemon config/server.properties
cd ../kafka2
bin/kafka-server-start.sh -daemon config/server.properties
cd ../kafka3
bin/kafka-server-start.sh -daemon config/server.properties

#检查 Kafka 状态
netstat -tnlp|grep 909


2、kafka 监控


#安装 kafka-eagle  https://www.kafka-eagle.org/ , 直接下载最新版即可
wget https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/refs/tags/v3.0.1
tar -xzvf kafka-eagle-bin-3.0.1.tar.gz
cd kafka-eagle-bin
tar -xzvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 /data/kafka-ui/

cd efak-web-3.0.1
vim bin/ke.sh
  #在第一行添加如下内容
  export KE_HOME=/data/kafka-ui/efak-web-3.0.1
  
  #KE_JAVA_OPTS 可根据需要调整

vim conf/system-config.properties
  efak.zk.cluster.alias=cluster1
  cluster1.zk.list=192.168.1.100:2181,192.168.1.100:2182,192.168.1.100:2183
  efak.driver=org.sqlite.JDBC
  efak.url=jdbc:sqlite:/data/kafka-ui/efak-web-3.0.1/db/ke.db
  efak.username=root
  efak.password=123456
  


#启动 kafka-eagle
bin/ke.sh
#可以正常启动,但访问打不开页面



#开启 kafka JMX
vim kafka1/bin/kafka-server-start.sh
  
  if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxrem
ote.port=9991 -Djava.rmi.server.hostname=192.168.1.100 -Dcom.sun.management.jmxremote.rmi.port=9981 
"
  fi
  
#另外 kafka2 和 kafka3 同样打开 JMX 对应的端口
kafka1   1   9091  9991  9981
kafka2   2   9092  9992  9982
kafka3   3   9093  9993  9983

修改完后再次启动 kafka-eagle 就可正常访问了

kafka 集群安装部署、监控与参数优化
kafka 集群安装部署、监控与参数优化
kafka 集群安装部署、监控与参数优化
kafka 集群安装部署、监控与参数优化

另外 kafka 也可以使用 jconsole 来查看相关数据;通过 zabbix 来收集运行时数据

3、kafka 优化

目前我们只是把 zookeeper 集群 和 kafka 集群搭建起来了,也没有修改特殊的配置
基于此怎么判断当前 kafka 能支撑多大吞吐量?稳定性怎么样?都是不确定的
有没有什么可以测试吞吐量的呢?还真有


#测试 kafka 生产者的吞吐量
--topic 指定 kafka 集群的 topic 名称 本例为 test
-num-records 总共需要发送的消息数 本例为 1000W
-record-size 一条信息有多大,单位是实节  本例为 1kb
-throughput 每秒钟发送的记录数 本例为 10W
-producer-props bootstrap.servers=xxx  kafka 集群的 broker 地址
#示例: (本例 topic 一个分区一个副本)
bin/kafka-producer-perf-test.sh --num-records 10000000 --record-size 1024 --topic test --throughput 100000  --producer-props bootstrap.servers=192.168.1.100:9091,192.168.1.100:9092,192.168.1.100:9093

 
kafka 集群安装部署、监控与参数优化
kafka 集群安装部署、监控与参数优化

测试结果:

本例中一共写入 1 千万条消息
平均是 30763.929907 条消息/秒
每秒向 Kafka 写入了 30.04MB 的数据
每次写入的平均延迟为 997.20.毫秒
最大的延迟为 3417.00 毫秒
50%消息延迟 1008 毫秒,95%消息延迟 1882 毫秒
99%消息延迟 2570 毫秒,99.9%消息延迟 3119 毫秒

测试消费的吞吐量
–topic 指定 kafka 集群的 topic 名称 本例为 test
-fetch-size 指定每次 fetch 的数据的大小,本例为 1048576 即 1M
-messages 总共要消费的消息个数,本例为 10000000,1000w
–threads 指定消费的线程数为 10
bin/kafka-consumer-perf-test.sh –broker-list 192.168.1.100:9091,192.168.1.100:9092,192.168.1.100:9093 –topic test –messages 10000000 –fetch-size 1048576 –threads 10

 
kafka 集群安装部署、监控与参数优化

测试结果:
开始时间 2023-11-26 04:09:33:172
结束时间 2023-11-26 04:10:01:650
共消费数据 9765.6254MB
吞吐量 342.9182MB/S
共消费数据 10000108 条(1000w)
平均每秒消费 351152.0472 条
重平衡时间 43 毫秒
拉取时间 28435 毫秒
每秒拉取消息数据 343.4368M/S
每秒拉取消息数量 351683.0666 条/秒

producer.properties 中影响参数:

batch.size
每当发送多个消息时,为了提高客户端和服务器的性能,生产者将尝试对多个消息进行打包成批,保证这一批可以在同一个分区内。默认为 16384【16KB】
单条消息超过 batch.size,Producer 有可能不会处理此消息
batch.size 过大,有可能会造成 Producer 端内存空间的浪费
batch.size 过小,频繁的网络 IO 会降低 Producer 的吞吐

linger.ms
如果消息迟迟没有达到 batch.size,那么将尝试等待 linger.ms 时间发送。默认等待时间为 0,也就是当消息到达之后立即发送

测试时可将参数 batch.size 扩大一倍, linger.ms 设置为 3000 并单独测试; 最后同时合并修改参数做测试

kafka 集群安装部署、监控与参数优化

调整参数后生产者的吞吐量提升了不少

compression.type
该参数对 Producer 生产的数据进行压缩,主要针对批数据压缩。默认是 none【无压缩】,可以用来设置的值
gzip/snappy/lz4/zstd

本例使用 zstd 做测试

kafka 集群安装部署、监控与参数优化
提升效果不太明显,可以根据需要使用不同的压缩方式

ack
“0”:当消息调用 send()发送出去之后就表示消息已经发送成功,不管消息是否已经到达 broker
“1”:消息发送后,Leader 接收到消息并记录到本地之后,不需要同步数据到副本就能进行 ack 返回
“all”:当消息在 Leader 接收记录,并且等待副本数据同步完成之后,才会返回 ack。该级别也属于 Java#Producer 的默认配置

kafka 集群安装部署、监控与参数优化
 

server.properties 参数调整


num.recovery.threads.per.data.dir=2

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2


log.retention.check.interval.ms=300000


zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0
message.max.bytes=10485760
replica.fetch.max.bytes=10485760

socket.receive.buffer.bytes=1024000  
socket.request.max.bytes=469296128   
max.request.size=1024000


controlled.shutdown.enable=true
controlled.shutdown.max.retries=5
controlled.shutdown.retry.backoff.ms=30000

delete.topic.enable=true
auto.create.topics.enable=true

其它还有些参数,建议在使用过程中根据实际情况来调整

© 版权声明

☆ END ☆
喜欢就点个赞吧
点赞0 分享
图片正在生成中,请稍后...