Kafka

基本概念

kafka

在0.9之前的版本中,offset存放在ZK中,在0.9之后的版本中,存放在kafka cluster中,原因是因为消费者本身就需要和kafka集群打交道,而没必要频繁地和ZK进行通信。

在ZK中,会产生一个Controller的节点,存储kafka集训相关的信息,最先启动的kafka机器充当集群的老大,主要和zookeeper做协调和交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Producer :消息生产者,就是向 kafka broker 发消息的客户端;

Consumer :消息消费者,向 kafka broker 取消息的客户端;

Consumer Group :消费者组,由多个 consumer 组成。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。

Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic

Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上
一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;

Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失且 kafka 仍然能够继续工作
kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本一个 leader 和若干个 follower。

leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。
leader 发生故障时,某个 follower 会成为新的 follower。

kafka的消费原理和RocketMQ相似,和消费者所属的组相关,如果所有的消费者都属于一个消费者组,那么一个消费者能消费到该消息,如果均属于不同的组,所有消费者均能消费到该消息。

安装

官网地址

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
#1.解压安装包
[root@localhost software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C
/opt/module/

#2.修改解压后的文件名称
[root@localhost module]$ mv kafka_2.11-0.11.0.0/ kafka

#3.在/opt/module/kafka 目录下创建 logs 文件夹
[root@localhost kafka]$ mkdir logs

#4.修改配置文件
[root@localhost kafka]$ cd config/
[root@localhost config]$ vi server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#修改以下内容:
#port = 9092 默认对外提供的端口号是9092

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行【数据】存放的路径,不是日志!日志默认会放在logs目录中
#需要配置和logs不同的路径,才能达到data和logs分离存储
log.dirs=/opt/module/kafka/data
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址 + /kafka表示使用zk的/kafka目录
zookeeper.connect=192.168.163.128:2181,192.168.163.129:2181,192.168.163.130:2181
1
2
3
4
5
6
7
8
9
10
#5.配置环境变量
[root@localhost module]$ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@localhost module]$ source /etc/profile

#6.发送安装包到另外两台机器(129,130)
#记得修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2
#注:broker.id 不得重复

启动和关闭

谁先启动,谁就是集群中的Master

1
2
3
4
5
6
#启动集群 -daemon 用守护进程启动,后台运行,启动需要指定配置文件
[root@localhost kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[root@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
1
2
3
4
#关闭集群
[root@localhost kafka]$ bin/kafka-server-stop.sh stop
[root@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[root@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

命令操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#2.创建 topic
[root@localhost kafka]$ bin/kafka-topics.sh --zookeeper 192.168.163.129:2181 --create --replication-factor 3 --partitions 1 --topic first
#选项说明:
#--topic 定义 topic 名
#--replication-factor 定义副本数
#--partitions 定义分区数

#2.查看当前服务器中的所有 topic
[root@localhost kafka]$ bin/kafka-topics.sh --zookeeper 192.168.163.129:2181 --list

#3.查看某个 Topic 的详情
[root@localhost kafka]$ bin/kafka-topics.sh --zookeeper 192.168.163.129:2181 --describe --topic first

#4.修改分区数
[root@localhost kafka]$ bin/kafka-topics.sh --zookeeper 192.168.163.129:2181 --alter --topic first --partitions 6

#5.删除 topic
#需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
[root@localhost kafka]$ bin/kafka-topics.sh --zookeeper 192.168.163.129:2181 --delete --topic first


#6.发送消息
[root@localhost kafka]$ bin/kafka-console-producer.sh --broker-list 192.168.163.129:9092 --topic first
>hello world
>atguigu atguigu

#7.消费消息
# 旧版使用【--zookeeper】 新版推荐使用 【--bootstrap-server】
# 【--from-beginning】:会把主题中以往所有的数据都读取出来。
[root@localhost kafka]$ bin/kafka-console-consumer.sh --zookeeper 192.168.163.129:2181 --topic first
[root@localhost kafka]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.163.129:9092 --topic first
[root@localhost kafka]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.163.129:9092 --from-beginning --topic first

kafka默认采用轮训的方式决定消息发送到哪个分区,主题持久化在磁盘中,offset也存储在kafka集群本地磁盘中

可以通过查看之前配置的data目录验证:

1
2
3
4
5
6
#主题分区的存储  主题名称:first  分区编号:0
drwxr-xr-x. 2 root root 141 2月 27 00:13 first-0

#__consumer_offsert是一个特殊的topic,用来存储offset
drwxr-xr-x. 2 root root 141 2月 27 00:16 __consumer_offsets-6
drwxr-xr-x. 2 root root 141 2月 27 00:16 __consumer_offsets-9

0.9前的kafka版本中,topic的各个partition的各个group消费情况:【offsets】放在了zookeeper中,新版把这个offsert保存到了一个__consumer_offserttopic下 ,默认是有50个分区,通过轮训的编号打散在各个broker中。

SHELL免密登录

通过129机器生成ssh密钥,下发公钥到130和131机器上,进行免密登录配置。

1
2
3
4
5
6
7
8
9
#1.进入ssh文件夹,若没有这个文件夹,则先执行一次ssh命令
[root@localhost ~]$ cd ~/.ssh/

#2.运行以下命令后敲三下回车创建公私钥
[root@localhost ~]$ ssh-keygen -t rsa

#3.拷贝 id_rsa文件中的内容到130和131机器上,第一次需要输入密码
# 运行完后,此时130机器上相同目录下会多一个 authorized_keys的文件
[root@localhost ~]$ ssh-copy-id 192.168.163.130

批量拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#安装rsync命令,sync只对差异文件做更新,性能更快,scp是把所有文件都复制过去。
#三台机器都需要装
[root@localhost ~]$ yum -y install rsync

#基本语法:
命令 选项参数 要拷贝的文件路径/名称 目的用户@主机:目的路径/名称
rsync -rvl $pdir/$fname $user@host:$pdir/$fname

-r 递归
-v 显示复制过程
-l 拷贝符号连接

#案例
[root@localhost opt]$ rsync -rvl /opt/module/kafka/ root@192.168.1.130:/opt/module/kafka/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#创建批量拷贝脚本
[root@localhost bin]$ vi xsync.sh

#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 basename 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 dirname 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环 129同步到130,131机器上
for i in 192.168.163.130 192.168.163.131
do
echo "========== $i =========="
rsync -rvl $pdir/$fname $user@$i:$pdir
done


#赋予权限
[root@localhost bin]$ chmod 777 xsync.sh
[root@localhost bin]$ sh xsync.sh /opt/module/kafka/

群起脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@localhost ~]$ vi kk.sh
#!bin/bash
case $1 in
"start"){
for i in 192.168.163.129 192.168.163.130 192.168.163.131
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
done
};;

"stop"){
for i in 192.168.163.129 192.168.163.130 192.168.163.131
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-stop.sh'
done
};;
esac

最后更新: 2021年02月28日 18:13

原始链接: https://midkuro.gitee.io/2021/02/26/kafka-base/

× 请我吃糖~
打赏二维码