Kafka 架构维度

整体架构

kafka

分区概念

kafka

批次消费

kafka

kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1.消息默认保留的天数:7天
2.默认开启异步自动提交,间隔时间5秒
3.Consumer默认拉取条数限制:500条,一次poll会取多个分区的数据
4.生产者携带key的分区策略默认使用Hash分区器
5.消费者关联partition的分区分配策略默认使用Range
6.zk的Controller节点是个临时节点,谁先向zk里写,谁就是controller
7.zk的broker节点是个永久节点,存储partition等相关信息,当一个broker下线时,其他broker需要获取它的信息
8.producer只能向partition的leader推送数据
9.kakfa持久化方式:data(log) + index(offset、timestamp)
10.ISR和OSR的默认超时阈值时间:10秒
11.ACK的默认值:1 (0,1,-1)
12.如果开启自动提交,然后自己代码手动提交,也会提交
13.index和timestamp文件存储的都是范围数据,二分大致定位offset之后去log文件按照偏移量偏移后再定位精确数据
14.如果ISR节点变少或者服务下线,消费者会造成一定时间的阻塞
15.如果ISR增加节点,则不会造成阻塞,平滑使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
创建TOPIC
kafka-topics.sh --zookeeper node03:2181/kafka --create --topic msb-items --partitions 2 --replication-factor 2
*/

@Test
public void producer() throws ExecutionException, InterruptedException {

String topic = "msb-items";
Properties p = new Properties();
p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092");
//kafka 持久化数据的MQ 数据-> byte[],不会对数据进行干预,双方要约定编解码
//kafka是一个app::使用零拷贝 sendfile 系统调用实现快速数据消费
p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);

//现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic

/*
msb-items
2partition
三种商品,每种商品有线性的3个ID
相同的商品最好去到一个分区里
*/

while(true){
for (int i = 0; i < 3; i++) {
for (int j = 0; j <3; j++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item"+j,"val" + i);
Future<RecordMetadata> send = producer
.send(record);

RecordMetadata rm = send.get();
int partition = rm.partition();
long offset = rm.offset();
System.out.println("key: "+ record.key()+" val: "+record.value()+" partition: "+partition + " offset: "+offset);

}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Test
public void consumer(){
/*
kafka-consumer-groups.sh --bootstrap-server node02:9092 --list
*/

//基础配置
Properties p = new Properties();
p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node02:9092,node03:9092,node01:9092");
p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

//消费的细节
p.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"OOXX");
//KAKFA IS MQ IS STORAGE
p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//第一次启动,米有offset
/**
* "What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server
* (e.g. because that data has been deleted):
* <ul>
* <li>earliest: automatically reset the offset to the earliest offset
* <li>latest: automatically reset the offset to the latest offset</li>
* <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
* </ul>";
*/
p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交时异步提交,丢数据&&重复数据
//一个运行的consumer ,那么自己会维护自己消费进度
//一旦你自动提交,但是是异步的
//1,还没到时间,挂了,没提交,重起一个consuemr,参照offset的时候,会重复消费
//2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失消费

// p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");//5秒
// p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少?
}

消费者rebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
//kafka 的consumer会动态负载均衡 -- ConsumerRebalanceListener
consumer.subscribe(Arrays.asList("msb-items"), new ConsumerRebalanceListener() {
//在rebalance之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("---onPartitionsRevoked:");
Iterator<TopicPartition> iter = partitions.iterator();
while(iter.hasNext()){
System.out.println(iter.next().partition());
}

}

//在rebalance之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("---onPartitionsAssigned:");
Iterator<TopicPartition> iter = partitions.iterator();

while(iter.hasNext()){
System.out.println(iter.next().partition());
}
}
});

三种手动commit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
while(true){
/**
* 常识:如果想多线程处理多分区
* 每poll一次,用一个语义:一个job启动
* 一次job用多线程并行处理分区
* 且,job应该被控制是串行的
* 以上的知识点,其实如果你学过大数据
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));// 0~n

if(!records.isEmpty()){
//以下代码的优化很重要
System.out.println("-----------"+records.count()+"-------------");
Set<TopicPartition> partitions = records.partitions(); //每次poll的时候是取多个分区的数据
//且每个分区内的数据是有序的

/**
* 如果手动提交offset
* 1,按消息进度同步提交
* 2,按分区粒度同步提交
* 3,按当前poll的批次同步提交
*
* 思考:如果在多个线程下
* 1,以上1,3的方式不用多线程
* 2,以上2的方式最容易想到多线程方式处理,有没有问题? 没有!分区多线程offset不冲突
*/
for (TopicPartition partition : partitions) {
List<ConsumerRecord<String, String>> pRecords = records.records(partition);
//流式计算:pRecords.stream().sorted()
//在一个微批里,按分区获取poll回来的数据
//线性按分区处理,还可以并行按分区处理用多线程的方式
Iterator<ConsumerRecord<String, String>> piter = pRecords.iterator();
while(piter.hasNext()){
ConsumerRecord<String, String> next = piter.next();
int par = next.partition();
long offset = next.offset();
String key = next.key();
String value = next.value();
long timestamp = next.timestamp();


System.out.println("key: "+ key+" val: "+ value+ " partition: "+par + " offset: "+ offset+"time:: "+ timestamp);

TopicPartition sp = new TopicPartition("msb-items", par);
OffsetAndMetadata om = new OffsetAndMetadata(offset);
HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
map.put(sp,om);

//这个是最安全的,每条记录级的更新,第一种
consumer.commitSync(map);
//单线程,多线程,都可以
}


/**
* 因为你都分区了
* 拿到了分区的数据集
* 期望的是先对数据整体加工
* 小问题会出现? 你怎么知道最后一条消息的offset?!!!!
* 感觉一定要有,kafka,很傻,你拿走了多少,我不关心,你告诉我你正确的最后一个消息offset
*/

long poff = pRecords.get(pRecords.size() - 1).offset();//获取分区内最后一条消息的offset
OffsetAndMetadata pom = new OffsetAndMetadata(poff);
HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
map.put(partition,pom);
consumer.commitSync(map);//这个是第二种,分区粒度提交offset
}
//这个就是按poll的批次提交offset,第3种
consumer.commitSync();
}
}

按照时间戳定位offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 以下代码在未来开发的时候,向通过自定时间点的方式,自定义消费数据位置
*
* 其实本质,核心知识是seek方法
*
* 举一反三:
* 1,通过时间换算出offset,再通过seek来自定义偏移
* 2,如果你自己维护offset持久化~!!!通过seek完成
*
*/

Map<TopicPartition, Long> tts =new HashMap<>();
//通过consumer取回自己分配的分区 as
Set<TopicPartition> as = consumer.assignment();

//自己填充一个hashmap,为每个分区设置对应的时间戳
for (TopicPartition partition : as) {
tts.put(partition,1610629127300L);
}
//通过consumer的api,取回timeindex的数据
Map<TopicPartition, OffsetAndTimestamp> offtime = consumer.offsetsForTimes(tts);


for (TopicPartition partition : as) {
//通过取回的offset数据,通过consumer的seek方法,修正自己的消费偏移
OffsetAndTimestamp offsetAndTimestamp = offtime.get(partition);
//如果不是通过time 换 offset,如果是从mysql读取回来,其本质是一样的
long offset = offsetAndTimestamp.offset();
System.out.println(offset);

//本质,consumer的seek偏移offset
consumer.seek(partition,offset);

}

持久化方式

kafka

ACK

kafka

kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
0,另外一个trade off:
不要强调磁盘的可靠性,转向异地多机的同步

1,如果拿磁盘做持久化,在做trade off
优先pagecache或者绝对磁盘

2,在多机集群分布式的时候
tradeoff
强一致,最终一致性(过半,ISR)

像redis,宁可用HA,不用刻意追求AOF准确性
像kafka,我们追求ack -1,不要太追求磁盘的可靠性
还有trade off ,就是在HA场景下,如果有实例异常退出,是否需要立刻尝试重启
1
2
3
像Redis的主从集群,两台从机一台主机,如果从机异常退出了,在短时间内又加回集群,这时候只需要和主机做增量同步即可。
如果时间过长,需要进行全量的RDB同步,需要和主机fork复制,会影响主机的性能
如果有脚本频繁对有问题的从机进行重启,会拖垮主机造成雪崩现象

kafka 的IO情况

kafka

偏移Seek

1
2
3
Topic: msb-items PartitionCount:2 ReplicationFactor:3 Configs:
Topic: msb-items Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: msb-items Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2

kafka

kafka

最后更新: 2021年03月08日 10:47

原始链接: https://midkuro.gitee.io/2021/02/28/kafka-frame/

× 请我吃糖~
打赏二维码