Kafka
Broker
Broker 工作流程
- zk存储的信息
总体工作流程
- 每台Broker启动后在zk中注册,三台启动后产生三个:0、1、2
- 谁先抢占controller,谁成为controller,成为controller后监控/brokers/ids目录,检测brokers节点是否产生变化——控制器管理着整个集群中分区以及副本的状态,控制器的选举需要依赖于Zookeeper
- 选举分区的leader:
- kafka中,分区的leader的选举操作也通过controller完成,选举一般在创建topic的时候和leader上下线的时候
- kafka中默认的leader的选举策略是OfflinePartitionLeaderElectionStrategy,从AR中按顺序查找第一个存活的副本(存活的副本的集合为ISR),副本必须在ISR中,如果不进行分区的重新分配,AR中的副本以及顺序是不变的,但是ISR会变。一般来说,Leader就是优先副本
- 底层以log形式存储,分为不同的Segment(大小为1G)。Segment内部采用索引的方式查询
节点的服役和退役
服役
启动新的一个服务器(假设已经启动了hadoop102、hadoop103、hadoop104)
创建一个要均衡的topic
1
2
3
4
5
6
7$ vim topics-to-move.json
{
"topics": [
{"topic": "first"}
],
"version": 1
}生成一个负载均衡计划
1
2
3
4
5$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
...
Proposed partition reassignment configuration
...创建副本存储计划(数据存储在broker0、),该计划即为上一个命令的输出
Proposed partition reassignment configuration
1
2
3
4$ vim increase-replication-factor.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
any","any"]}]}执行副本存储计划
1
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划
1
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
退役
退役hadoop105
与节点服役类似,创建一个要均衡的topic,然后创建执行计划(原本的broker-list为0,1,2,3)
1
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
复制上面的输出,创建副本存储计划
increase-replication-factor.json
并执行,最后停止hadoop105的kafka1
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
Kafka副本
基本信息
- 提高数据可靠性,Kafka默认副本为1个,生产环境一般配置为2个
- Kafka中副本分为:Leader和Follower——Hadoop中的副本等价,不区分leader、follower
- 生产者和消费者的操作对象为Leader
- Follower通过Leader同步数据
- 分区中的所有副本统称为AR(Assigned Repllicas),AR = ISR + OSR
- ISR:和Leader保持同步的Follower集合(包括Leader本身)。Follower长时间不与Leader通信,则被踢出 ISR。时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader挂掉后从ISR选举新Leader
- OSR:Follower与Leader副本同步时,延迟过多的副本
选举流程
有一个broker的Controller会被选举为Controller Leader(抢占式),管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作
选择ISR中的、位于AR前面的节点作为Leader
故障处理
Follower故障
* LEO(Log End Offset):每个副本的最后一个offset,LEO是最新的offset+1 * HW(High Watermark):所有副本中最小的LEO,HW之前的数据才对Consumer可见 * Follower发生故障后会被临时踢出ISR,期间Leader和其他Follower继续接收数据;该Follower恢复后,会读取本地磁盘记录的上次的HW,将log文件高于HW的部分截掉,从HW向Leader同步,等该Follower的LEO大于等于该Partition的HW,该Follower就追上了,可以重新加入ISR * 上图紫色为新读取的数据,broker2本来有数据5、6,挂掉后,broker0、1继续读取数据,HW移动到8(此时broker0读取到9,broker1同步到7),broker2重连,将高于原HW的数据5、6删除,同步5、6、7,此时broker2的LEO大于等于当前的HW,broker2可以重新加入ISRLeader故障
- Leader发生故障后,从ISR选出一个新的Leader,其余的Follower先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据
- 只能保证副本之间的数据一致性,不能保证数据不丢失或者不重复(例如,上一个leader处理到5、6、7,但新leader只保留数据到4)
分区副本的分配
kafka的分区数大于服务器台数:Leader按循环分配到不同的broker,保证负载均衡
1
$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic test
手动调整分区副本存储:创建一个新的topic,4个分区两个副本,将topic的所有副本存储到broker0、broker1
创建topic
1
$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic testnew
创建副本存储计划, 所有副本指定存储在broker0、broker1,执行
1
2
3
4
5
6
7
8
9$ vim increase-replication-factor.json
{
"version":1,
"partitions":[{"topic":"testnew","partition":0,"replicas":[0,1]},
{"topic":"testnew","partition":1,"replicas":[0,1]},
{"topic":"testnew","partition":2,"replicas":[1,0]},
{"topic":"testnew","partition":3,"replicas":[1,0]}]
}
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
Leader Partition负载平衡:
Kafka会自动把Leader Partition均匀分散在各个机器上,保证每台机器的读写吞吐量都是均匀的
如果某些broker宕机,会导致Leader Partition重新选举后集中在其他少部分几台broker,宕机的broker重启之后会重新成为Follower,造成集群负载不均衡
通过参数设置再平衡:
增加副本数目
创建topic(一个副本)
1
$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic testAdd
手动增加副本(3个副本):创建副本存储计划并执行
1
2
3
4
5
6
7
8
9
10$ vim increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"testAdd","partition":0,"replicas":[0,1,2]},
{"topic":"testAdd","partition":1,"replicas":[0,1,2]},
{"topic":"testAdd","partition":2,"replicas":[0,1,2]}
]
}
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
文件存储
文件存储机制
Topic是逻辑的概念,partition是物理的概念,每个partition对应一个log文件,log文件中存储的Producer发送的数据,数据追加到log文件末端
Kafka采取了分片和索引机制,每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件,位于一个文件夹下,文件夹的命名规则为:topic名称+分区序号,例如:first-0
- .log:日志文件
- .index:偏移量索引文件
- .timeindex:时间戳索引文件
- index和log文件以当前segment的第一条消息的offset命名
Log文件与Index文件:
文件清理策略
默认的日志保存时间为7天
- log.retention.hours,最低优先级,小时为单位,默认 7 天
- log.retention.minutes,优先级高一些,分钟为单位
- log.retention.ms,最高优先级,毫秒为单位
- log.retention.check.interval.ms,检查周期,默认5分钟
delete清理策略:log.cleanup.policy = delete
- 基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳(segment中最后一个记录的时间戳)
- 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment
compact清理策略:log.cleanup.policy = compact
对于相同key的不同value值,只保留最后一个版本
压缩后的offset可能是不连续的,上图中没有offset 6
该策略只适合特殊场景,例如消息的key是用户ID,value是用户的资料,压缩后的消息集保存用户最新的资料
数据读写的高效性
高效:
- Kafka 本身是分布式集群,可以采用分区技术,并行度高
- 读数据采用稀疏索引,快速定位要消费的数据
- 顺序写磁盘(写的过程是追加到文件末端)
- 页缓存 + 零拷贝
零拷贝:Kafka的数据加工操作交给生产者和消费者,Broker应用层不关心存储的数据,因此数据传输时不经过应用层
PageCache页缓存:
- Kafka重度依赖底层操作系统提供的PageCache功能
- 当上层有写操作时,操作系统将数据写入PageCache,kernel space决定什么时候数据落盘
- 当上层有读操作时,先从PageCache查找,找不到再去磁盘中读取
Consumer
消费方式
- pull模式:主动从broker拉取数据,但如果broker没有数据,消费者可能陷入循环(一直返回空数据)
- push模式:由broker决定消息发送速率——很难适应所有消费者的消费速率
工作流程
总体流程:
- Producer发送数据到broker的leader,follower同步数据
- 消费者相互独立
- 每个分区的数据,只能由消费者组里的一个消费者消费——将消费者组视为一个独立的消费者
- 通过offset,记录消费者目前消费到的位置(老版本中offset存储在zk中,现在由系统主题保存到磁盘,减少网络通信)
消费者组
- 消费者组由多个consumer组成,同组的consumer的groupid相同
- 消费者组之间互不影响
- 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
- 如果向消费组中添加更多的消费者,最终超过主题分区数量,则有一部分消费者会闲 置,不接收任何消息
消费者组初始化流程:
* coordinator:辅助消费者组的初始化和分区的分配,每个broker都有一个。节点选择=groupid的hashcode % 50( `__consumer_offsets`的分区数量)。消费者组下的消费者向该coordinator提交offset * 过程: * 组内每个消费者向该coordinator发送加入组的请求 * coordinator随机选出该消费者组的leader,将要消费的topic的情况(该topic的分区情况等信息)发给leader * leader制定消费计划,确定组内各个消费者分别消费的分区号(见后面的分区分配策略) * coordinator下发方案给这个组的所有消费者 * **coordinator会监控消费者的心跳和消费能力**消费者组消费流程:
* 消费者创建消费者网络客户端 * 首先调用sendFetches方法,协调每批次拉取的大小和拉取时间间隔,方法中包含三个参数 * 调用send方法发送请求,通过回调方法onSuccess将结果拉取到消息队列 * 反序列化、拦截器、数据处理
消费者API
独立消费者:
消费topic的数据(要求生产者发送了相应topic的数据)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}消费某个主题的某个分区(结合之前的生产者partition)
1
2
3
4// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);
消费者组:复制一份基础消费者的代码,同时启动即可启动同一个消费者组中的两个消费者
注意,如果想要重复消费已经消费过的数据,要改groupid并且
properties.setProperty("auto.offset.reset", "earliest");
,或者停止当前的消费者线程,重新运行时代码里主动修改offset。因为原来的groupid的offset已经变了!参考kafka常见问题如果想消费已经被消费过的数据
分区的分配和再平衡
消费者组中包含多个消费者,topic包含多个分区——哪个consumer消费哪个partition的数据?
四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky
配置参数partition.assignment.strategy修改分区的分配策略
默认策略是Range + CooperativeSticky——可以同时使用多个分区分配策略
Range 以及再平衡
- Range是对一个topic的
- 对同一个topic的分区按序号排序,对消费者按字母顺序排序
- 如果有7个分区,3 个消费者,排序后分区为:
0,1,2,3,4,5,6
,消费者为C0,C1,C2
,除不尽则前面的消费者消费的分区数目增加 - 即,partitions数/consumer数,决定每个消费者消费几个分区。如果除不尽,前面的消费者将会多消费1个分区
- 如果有7个分区,3 个消费者,排序后分区为:
- 当topic数目增多,C0消费者的分区消费数目,会比其他消费者明显更多,容易产生数据倾斜
- 再平衡:如果停止C0,则会重新按照range方式分配——平衡后,C1消费原来的0、1、3、4号分区数据,C2消费者原来的消费2、5、6号分区数据,新的数据分区后,C1消费0、1、2、3、4号分区,C2消费4、5、6号分区
RoundRobin 以及再平衡
- 把所有partition和组内所有consumer列出,按照hashcode排序,通过轮询算法分配partition到各个消费者
- 再分配:如果C0挂掉,数据轮询内容为0、3、6号分区,平衡后C1消费0、1、4、6号分区数据,C2消费者消费2、3、5号分区数据
Sticky 以及再平衡
- 执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,节省大量的开销
- 与Range相似,但同样的例子里,C0消费者虽然消费三个分区,分区号是随机的
Offset
维护位置
consumer默认将offset保存在Kafka一个内置的topic中:
__consumer_offsets
__consumer_offsets
采用key-value存储数据- key是group.id+topic+分区号,value是当前offset的值
- 每隔一段时间,kafka会对这个topic进行compact,group.id+topic+分区号保留最新数据
如果要查看这个topic,则配置文件
config/consumer.properties
中添加配置exclude.internal.topics=false
, 默认true,表示不能消费系统主题1
$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
自动提交offset
1 | // 自动提交offset |
手动提交offset
commitSync同步提交:阻塞当前线程,发送提交offset请求,一直到提交成功,并且会自动失败重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 手动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
...
// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
// 同步提交 offset
kafkaConsumer.commitSync();
}commitAsync异步提交:发送完提交offset请求后,就开始拉取下一批数据,没有失败重试机制,有可能提交失败——修改为
kafkaConsumer.commitAsync();
即可
指定offset消费
当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量(例如数据已删除)
参数:
auto.offset.reset = earliest | latest | none
,默认是latest- earliest:自动将偏移量重置为最早的偏移量,–from-beginning
- none:未找到消费者组的先前偏移量,则抛出异常
指定任意offset位置开始消费——要先获得消费者对应的分区!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 注册要消费的主题(可以消费多个主题)
List<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 指定位置消费
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配方案(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 5 的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 5);
}
重新按照时间消费(指定时间消费)
1 | // 获得分区分配方案 |
漏消费和重复消费
重复消费:已经消费了数据,但是offset没提交(自动提交offset引起)
漏消费:先提交offset后消费,有可能会造成数据的漏消费(offset被提交,但数据还在内存中未落盘,刚好消费者线程被kill,此时offset已经提交但是数据未处理,这部分内存中的数据丢失,下次启动是broker显示已经offset已经移动,但实际上消费者没有这个数据)
消费者事务
- 完成Consumer端的精准一次性消费(不漏消费也不重复消费),需要Kafka消费端将消费过程和提交offset过程做原子绑定——需要将Kafka的offset保存到支持事务的自定义介质(如 MySQL)
- 下游消费者必须支持事务,才能做到精确的一次性消费
数据积压(提高消费者吞吐量)
- 如果消费能力不足,考虑增加Topic的分区数,同时提升消费组的消费者数量,消费者数 = 分区数
- 下游的数据处理不及时:提高每批次拉取的数量