Kafka集群配置与管理

简介

Apache Kafka 是一种分布式流处理平台,最初由 LinkedIn 开发,现在是 Apache 软件基金会的顶级项目之一。它是一个高吞吐量的分布式发布-订阅消息系统,设计用于处理大规模的实时数据流。

主要特性:

    1. 分布式系统: Kafka 是一个分布式系统,可以跨多个节点进行水平扩展,以处理大量数据。
    2. 高吞吐量: Kafka 能够处理大规模数据流,每秒可以处理数百万条消息。
    3. 持久性存储: Kafka 使用可持久性存储,消息被持久化在磁盘上,因此即使消费者离线一段时间,它们也能够获取之前的消息。
    4. 水平扩展: Kafka 的设计允许在需要时轻松地水平扩展,以适应不断增长的负载。
    5. 多订阅者: 多个订阅者(消费者)可以同时订阅主题,每个订阅者都可以独立地消耗消息。
  1. 可靠性: Kafka 保证消息传递的可靠性,确保消息不会在传输过程中丢失。

组件:

  1. Producer(生产者): 负责将消息发布到 Kafka 的主题。
  2. Consumer(消费者): 从 Kafka 主题中订阅并处理消息。
  3. Broker(代理): Kafka 集群中的每个节点都是一个代理,用于存储消息并处理生产者和消费者的请求。
  4. Topic(主题): 消息的类别标签,允许生产者发布消息并消费者订阅消息。
  5. Partition(分区): 每个主题可以分成多个分区,以便实现水平扩展和并行处理。

Apache Kafka 已经成为许多企业用于构建实时数据管道和处理大规模数据流的首选解决方案。

标准化

  配置项   值
当前版本 2.11-1.1.0
部署路径 /usr/local/kafka
配置文件 /usr/local/kafka/config/server.properties
数据路径 /data/kafka/kafka-logs
日志路径 /data/kafka/kafka-logs
JVM 配置 export KAFKA_HEAP_OPTS=”-Xmx6G -Xms6G”

export JMX_PORT=”9999″

启停方式
systemctl stop|stop kafka
端口 9092

1. zookeeper 修改 /usr/local/zookeeper/bin/zkEnv.sh 脚本中的 ZOO_LOG_DIR 变量为 /data/logs/zookeeper/
2. kafka 修改/usr/local/kafka/bin/kafka-server-start.sh 脚本中的 KAFKA_HEAP_OPTS 变量为 -Xmx6G -Xms6G,添加 JMX_PORT 变量为 9999
3. 如果按照通用标准配置 JVM,服务器配置至少需要 12G 内存,具体情况可根据业务消息数据来进行调整

安装


wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz -P /usr/local/src/
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz -P /usr/local/src/
tar -xf /usr/local/src/zookeeper-3.4.12.tar.gz -C /usr/local/
tar -xf /usr/local/src/kafka_2.11-1.1.0.tgz -C /usr/local/
ln -s /usr/local/kafka_2.11-1.1.0 /usr/local/kafka
ln -s /usr/local/zookeeper-3.4.12 /usr/local/zookeeper
mkdir -p /data/zookeeper
mkdir -p /data/logs/zookeeper
chown -R appsvc. /data/
chown -R appsvc. /usr/local/kafka
chown -R appsvc. /usr/local/zookeeper
cp -a /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties-bak

 

配置

单机配置


cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
#启动 zk
systemctl start zookeeper
 
#导入 kafka 配置文件
cat > /usr/local/kafka/config/server.properties << EOF
#监听地址
listeners=PLAINTEXT://IP 地址:9092
#默认 partitions 数量
num.partitions=1
#数量保留时间
log.retention.hours=72
#副本数量
default.replication.factor=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#zk 连接地址及目录
zookeeper.connect=IP 地址:2181/kafka
broker.id=0
num.network.threads=5
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
delete.topic.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF
 
#启动 kafka
systemctl start kafka

 

集群配置

#生产 zk 配置文件
cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.0=192.168.162.69:20881:30881
server.1=192.168.162.70:20881:30881
server.2=192.168.162.71:20881:30881
EOF
#生成 myid
echo 0 >  /data/zookeeper/myid
echo 1 >  /data/zookeeper/myid
echo 2 >  /data/zookeeper/myid
#启动 zk
systemctl start zookeeper
 
#生产 kafka 配置文件
#节点 1
vim /usr/local/kafka/config/server.properties
#修改 broker.id 值
broker.id=0
#监听地址
listeners=PLAINTEXT://192.168.162.69:9092
#默认 partitions 数量
num.partitions=1
#数量保留时间
log.retention.hours=72
#副本数量
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
#zk 连接地址及目录
zookeeper.connect=192.168.162.69:2181,192.168.162.70:2181,192.168.162.71:2181/kafka
num.network.threads=5
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
delete.topic.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
 
#节点 2
#修改 broker.id 值
broker.id=1
#监听地址
listeners=PLAINTEXT://192.168.162.70:9092
#默认 partitions 数量
num.partitions=1
#数量保留时间
log.retention.hours=72
#副本数量
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
#zk 连接地址及目录
zookeeper.connect=192.168.162.69:2181,192.168.162.70:2181,192.168.162.71:2181/kafka
num.network.threads=5
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
delete.topic.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
 
#节点 3
#修改 broker.id 值
broker.id=2
#监听地址
listeners=PLAINTEXT://192.168.162.71:9092
#默认 partitions 数量
num.partitions=1
#数量保留时间
log.retention.hours=72
#副本数量
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
#zk 连接地址及目录
zookeeper.connect=192.168.162.69:2181,192.168.162.70:2181,192.168.162.71:2181/kafka
num.network.threads=5
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
delete.topic.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
 
#启动 kafka
systemctl start kafka

 

启停

zookeeper 启停


#添加启停脚本
cat > /usr/lib/systemd/system/zookeeper.service << EOF
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
User=appsvc
Group=appsvc
LimitNOFILE=655350
Environment="PATH=/opt/jdk/bin:/opt/jdk/bin:/opt/openresty/nginx/sbin:/opt/node/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/bin"
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
EOF
 
#启动
systemctl daemon-reload
systemctl start zookeeper
systemctl enable zookeeper

kafka 启停

#添加启停脚本
cat > /usr/lib/systemd/system/kafka.service << EOF
[Unit]
Description=Kafka service
After=network.target
  
[Service]
User=appsvc
Group=appsvc
LimitNOFILE=655350
Environment="PATH=/opt/jdk/bin:/sbin:/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/bin"
SyslogIdentifier=zookeeper
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
EOF
 
#启动
systemctl daemon-reload
systemctl start kafka
systemctl enable kafka

 

© 版权声明

☆ END ☆
喜欢就点个赞吧
点赞0 分享
图片正在生成中,请稍后...