1、kafka 集群安装
kafka 的集群部署方式网上有很多,此处只列出主要安装步骤
#所有都部署到/data 目录下,可根据自己面要安装在不同目录
#本机 ip 为 192.168.1.100
cd /data
##1、先下载 zookeeper https://zookeeper.apache.org/releases.html#download
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz
##2、部署 zookeeper(本次单机部署 3 个节点伪集群)
tar -xzvf apache-zookeeper-3.8.3-bin.tar.gz
mv apache-zookeeper-3.8.3-bin zookeeper1
cd zookeeper1
mkdir data
mv conf/zoo_simple.cfg conf/zoo.cfg
#编辑 zoo.cfg
vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper1/data
clientPort=2181
server.1=192.168.1.100:2881:3881
server.2=192.168.1.100:2882:3882
server.3=192.168.1.100:2883:3883
cp -R zookeeper1 zookeeper2
cp -R zookeeper1 zookeeper3
#同理更新 conf/zoo.cfg 的 clientPort 为 2182 和 2183
echo 1 > zookeeper1/data/myid
echo 2 > zookeeper2/data/myid
echo 3 > zookeeper3/data/myid
#启动 zookeeper
zookeeper1/bin/zkServer.sh start
zookeeper2/bin/zkServer.sh start
zookeeper3/bin/zkServer.sh start
#检查 zookeeper 是否启动成功
netstat -tnlp|grep 218
#对应 zookeeper 端口
zookeeper1 1 2181 2881 3881
zookeeper2 2 2182 2882 3882
zookeeper3 3 2183 2883 3883
##3、下载 kafka https://kafka.apache.org/downloads
wget https://archive.apache.org/dist/kafka/2.0.1/kafka_2.11-2.0.1.tgz
##4、部署 kafka 集群
tar -xzvf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0 kafka1
#编辑 kafka 配置文件
vim kafka1/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9091
advertised.listeners=PLAINTEXT://192.168.1.100:9091
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.100:2181,192.168.1.100:2182,192.168.1.100:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#复制 kafka1
cp -R kafka1 kafka2
cp -R kafka1 kafka3
#同理修改 config/server.properties 中的 listeners 端口,borker.id=2 和 3, 还有 log.dirs
#启动Kafka 集群
cd ../kafka1
bin/kafka-server-start.sh -daemon config/server.properties
cd ../kafka2
bin/kafka-server-start.sh -daemon config/server.properties
cd ../kafka3
bin/kafka-server-start.sh -daemon config/server.properties
#检查 Kafka 状态
netstat -tnlp|grep 909
2、kafka 监控
#安装 kafka-eagle https://www.kafka-eagle.org/ , 直接下载最新版即可
wget https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/refs/tags/v3.0.1
tar -xzvf kafka-eagle-bin-3.0.1.tar.gz
cd kafka-eagle-bin
tar -xzvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 /data/kafka-ui/
cd efak-web-3.0.1
vim bin/ke.sh
#在第一行添加如下内容
export KE_HOME=/data/kafka-ui/efak-web-3.0.1
#KE_JAVA_OPTS 可根据需要调整
vim conf/system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.1.100:2181,192.168.1.100:2182,192.168.1.100:2183
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/data/kafka-ui/efak-web-3.0.1/db/ke.db
efak.username=root
efak.password=123456
#启动 kafka-eagle
bin/ke.sh
#可以正常启动,但访问打不开页面
#开启 kafka JMX
vim kafka1/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxrem
ote.port=9991 -Djava.rmi.server.hostname=192.168.1.100 -Dcom.sun.management.jmxremote.rmi.port=9981
"
fi
#另外 kafka2 和 kafka3 同样打开 JMX 对应的端口
kafka1 1 9091 9991 9981
kafka2 2 9092 9992 9982
kafka3 3 9093 9993 9983
修改完后再次启动 kafka-eagle 就可正常访问了
另外 kafka 也可以使用 jconsole 来查看相关数据;通过 zabbix 来收集运行时数据
3、kafka 优化
目前我们只是把 zookeeper 集群 和 kafka 集群搭建起来了,也没有修改特殊的配置
基于此怎么判断当前 kafka 能支撑多大吞吐量?稳定性怎么样?都是不确定的
有没有什么可以测试吞吐量的呢?还真有
#测试 kafka 生产者的吞吐量
--topic 指定 kafka 集群的 topic 名称 本例为 test
-num-records 总共需要发送的消息数 本例为 1000W
-record-size 一条信息有多大,单位是实节 本例为 1kb
-throughput 每秒钟发送的记录数 本例为 10W
-producer-props bootstrap.servers=xxx kafka 集群的 broker 地址
#示例: (本例 topic 一个分区一个副本)
bin/kafka-producer-perf-test.sh --num-records 10000000 --record-size 1024 --topic test --throughput 100000 --producer-props bootstrap.servers=192.168.1.100:9091,192.168.1.100:9092,192.168.1.100:9093
测试结果:
本例中一共写入 1 千万条消息
平均是 30763.929907 条消息/秒
每秒向 Kafka 写入了 30.04MB 的数据
每次写入的平均延迟为 997.20.毫秒
最大的延迟为 3417.00 毫秒
50%消息延迟 1008 毫秒,95%消息延迟 1882 毫秒
99%消息延迟 2570 毫秒,99.9%消息延迟 3119 毫秒测试消费的吞吐量
–topic 指定 kafka 集群的 topic 名称 本例为 test
-fetch-size 指定每次 fetch 的数据的大小,本例为 1048576 即 1M
-messages 总共要消费的消息个数,本例为 10000000,1000w
–threads 指定消费的线程数为 10
bin/kafka-consumer-perf-test.sh –broker-list 192.168.1.100:9091,192.168.1.100:9092,192.168.1.100:9093 –topic test –messages 10000000 –fetch-size 1048576 –threads 10
测试结果:
开始时间 2023-11-26 04:09:33:172
结束时间 2023-11-26 04:10:01:650
共消费数据 9765.6254MB
吞吐量 342.9182MB/S
共消费数据 10000108 条(1000w)
平均每秒消费 351152.0472 条
重平衡时间 43 毫秒
拉取时间 28435 毫秒
每秒拉取消息数据 343.4368M/S
每秒拉取消息数量 351683.0666 条/秒
producer.properties 中影响参数:
batch.size
每当发送多个消息时,为了提高客户端和服务器的性能,生产者将尝试对多个消息进行打包成批,保证这一批可以在同一个分区内。默认为 16384【16KB】
单条消息超过 batch.size,Producer 有可能不会处理此消息
batch.size 过大,有可能会造成 Producer 端内存空间的浪费
batch.size 过小,频繁的网络 IO 会降低 Producer 的吞吐linger.ms
如果消息迟迟没有达到 batch.size,那么将尝试等待 linger.ms 时间发送。默认等待时间为 0,也就是当消息到达之后立即发送测试时可将参数 batch.size 扩大一倍, linger.ms 设置为 3000 并单独测试; 最后同时合并修改参数做测试
调整参数后生产者的吞吐量提升了不少
compression.type
该参数对 Producer 生产的数据进行压缩,主要针对批数据压缩。默认是 none【无压缩】,可以用来设置的值
gzip/snappy/lz4/zstd本例使用 zstd 做测试
提升效果不太明显,可以根据需要使用不同的压缩方式
ack
“0”:当消息调用 send()发送出去之后就表示消息已经发送成功,不管消息是否已经到达 broker
“1”:消息发送后,Leader 接收到消息并记录到本地之后,不需要同步数据到副本就能进行 ack 返回
“all”:当消息在 Leader 接收记录,并且等待副本数据同步完成之后,才会返回 ack。该级别也属于 Java#Producer 的默认配置
server.properties 参数调整
num.recovery.threads.per.data.dir=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=469296128
max.request.size=1024000
controlled.shutdown.enable=true
controlled.shutdown.max.retries=5
controlled.shutdown.retry.backoff.ms=30000
delete.topic.enable=true
auto.create.topics.enable=true
其它还有些参数,建议在使用过程中根据实际情况来调整