Kafka (2)Broker、Consumer

Kafka

Broker

Broker 工作流程

  • zk存储的信息
image-20220817202613407
  • 总体工作流程

    image-20220817202845878
    • 每台Broker启动后在zk中注册,三台启动后产生三个:0、1、2
    • 谁先抢占controller,谁成为controller,成为controller后监控/brokers/ids目录,检测brokers节点是否产生变化——控制器管理着整个集群中分区以及副本的状态,控制器的选举需要依赖于Zookeeper
    • 选举分区的leader:
      • kafka中,分区的leader的选举操作也通过controller完成,选举一般在创建topic的时候和leader上下线的时候
      • kafka中默认的leader的选举策略是OfflinePartitionLeaderElectionStrategy,从AR中按顺序查找第一个存活的副本(存活的副本的集合为ISR),副本必须在ISR中,如果不进行分区的重新分配,AR中的副本以及顺序是不变的,但是ISR会变。一般来说,Leader就是优先副本
    • 底层以log形式存储,分为不同的Segment(大小为1G)。Segment内部采用索引的方式查询

节点的服役和退役

服役

  • 启动新的一个服务器(假设已经启动了hadoop102、hadoop103、hadoop104)

  • 创建一个要均衡的topic

    1
    2
    3
    4
    5
    6
    7
    $ vim topics-to-move.json
    {
    "topics": [
    {"topic": "first"}
    ],
    "version": 1
    }
  • 生成一个负载均衡计划

    1
    2
    3
    4
    5
    $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
    Current partition replica assignment
    ...
    Proposed partition reassignment configuration
    ...
  • 创建副本存储计划(数据存储在broker0、),该计划即为上一个命令的输出Proposed partition reassignment configuration

    1
    2
    3
    4
    $ vim increase-replication-factor.json

    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
    any","any"]}]}
  • 执行副本存储计划

    1
    $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  • 验证副本存储计划

    1
    $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

退役

  • 退役hadoop105

  • 与节点服役类似,创建一个要均衡的topic,然后创建执行计划(原本的broker-list为0,1,2,3)

    1
    $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
  • 复制上面的输出,创建副本存储计划increase-replication-factor.json并执行,最后停止hadoop105的kafka

    1
    $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

Kafka副本

基本信息

  • 提高数据可靠性,Kafka默认副本为1个,生产环境一般配置为2个
  • Kafka中副本分为:Leader和Follower——Hadoop中的副本等价,不区分leader、follower
    • 生产者和消费者的操作对象为Leader
    • Follower通过Leader同步数据
  • 分区中的所有副本统称为AR(Assigned Repllicas),AR = ISR + OSR
    • ISR:和Leader保持同步的Follower集合(包括Leader本身)。Follower长时间不与Leader通信,则被踢出 ISR。时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader挂掉后从ISR选举新Leader
    • OSR:Follower与Leader副本同步时,延迟过多的副本

选举流程

  • 有一个broker的Controller会被选举为Controller Leader(抢占式),管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作

    image-20220818112805185
  • 选择ISR中的、位于AR前面的节点作为Leader

故障处理

  • Follower故障

    image-20220818113651188 * LEO(Log End Offset):每个副本的最后一个offset,LEO是最新的offset+1 * HW(High Watermark):所有副本中最小的LEO,HW之前的数据才对Consumer可见 * Follower发生故障后会被临时踢出ISR,期间Leader和其他Follower继续接收数据;该Follower恢复后,会读取本地磁盘记录的上次的HW,将log文件高于HW的部分截掉,从HW向Leader同步,等该Follower的LEO大于等于该Partition的HW,该Follower就追上了,可以重新加入ISR * 上图紫色为新读取的数据,broker2本来有数据5、6,挂掉后,broker0、1继续读取数据,HW移动到8(此时broker0读取到9,broker1同步到7),broker2重连,将高于原HW的数据5、6删除,同步5、6、7,此时broker2的LEO大于等于当前的HW,broker2可以重新加入ISR
  • Leader故障

    • Leader发生故障后,从ISR选出一个新的Leader,其余的Follower先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据
    • 只能保证副本之间的数据一致性,不能保证数据不丢失或者不重复(例如,上一个leader处理到5、6、7,但新leader只保留数据到4)

分区副本的分配

  • kafka的分区数大于服务器台数:Leader按循环分配到不同的broker,保证负载均衡

    1
    $ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic test
    image-20220818122928534
  • 手动调整分区副本存储:创建一个新的topic,4个分区两个副本,将topic的所有副本存储到broker0、broker1

    • 创建topic

      1
      $ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic testnew
    • 创建副本存储计划, 所有副本指定存储在broker0、broker1,执行

      1
      2
      3
      4
      5
      6
      7
      8
      9
      $ vim increase-replication-factor.json
      {
      "version":1,
      "partitions":[{"topic":"testnew","partition":0,"replicas":[0,1]},
      {"topic":"testnew","partition":1,"replicas":[0,1]},
      {"topic":"testnew","partition":2,"replicas":[1,0]},
      {"topic":"testnew","partition":3,"replicas":[1,0]}]
      }
      $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
      image-20220818123310191
  • Leader Partition负载平衡:

    • Kafka会自动把Leader Partition均匀分散在各个机器上,保证每台机器的读写吞吐量都是均匀的

    • 如果某些broker宕机,会导致Leader Partition重新选举后集中在其他少部分几台broker,宕机的broker重启之后会重新成为Follower,造成集群负载不均衡

    • 通过参数设置再平衡:

      image-20220818123541315
  • 增加副本数目

    • 创建topic(一个副本)

      1
      $ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic testAdd
    • 手动增加副本(3个副本):创建副本存储计划并执行

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      $ vim increase-replication-factor.json
      {
      "version":1,
      "partitions":[
      {"topic":"testAdd","partition":0,"replicas":[0,1,2]},
      {"topic":"testAdd","partition":1,"replicas":[0,1,2]},
      {"topic":"testAdd","partition":2,"replicas":[0,1,2]}
      ]
      }
      $ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

文件存储

文件存储机制

image-20220818124635278
  • Topic是逻辑的概念,partition是物理的概念,每个partition对应一个log文件,log文件中存储的Producer发送的数据,数据追加到log文件末端

  • Kafka采取了分片和索引机制,每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件,位于一个文件夹下,文件夹的命名规则为:topic名称+分区序号,例如:first-0

    • .log:日志文件
    • .index:偏移量索引文件
    • .timeindex:时间戳索引文件
    • index和log文件以当前segment的第一条消息的offset命名
  • Log文件与Index文件:

    image-20220818131449418

    image-20220818131645180

文件清理策略

  • 默认的日志保存时间为7天

    • log.retention.hours,最低优先级,小时为单位,默认 7 天
    • log.retention.minutes,优先级高一些,分钟为单位
    • log.retention.ms,最高优先级,毫秒为单位
    • log.retention.check.interval.ms,检查周期,默认5分钟
  • delete清理策略:log.cleanup.policy = delete

    • 基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳(segment中最后一个记录的时间戳)
    • 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment
  • compact清理策略:log.cleanup.policy = compact

    • 对于相同key的不同value值,只保留最后一个版本

      image-20220818132231857

    • 压缩后的offset可能是不连续的,上图中没有offset 6

    • 该策略只适合特殊场景,例如消息的key是用户ID,value是用户的资料,压缩后的消息集保存用户最新的资料

数据读写的高效性

  • 高效:

    • Kafka 本身是分布式集群,可以采用分区技术,并行度高
    • 读数据采用稀疏索引,快速定位要消费的数据
    • 顺序写磁盘(写的过程是追加到文件末端)
    • 页缓存 + 零拷贝
  • 零拷贝:Kafka的数据加工操作交给生产者和消费者,Broker应用层不关心存储的数据,因此数据传输时不经过应用层

  • PageCache页缓存:

    • Kafka重度依赖底层操作系统提供的PageCache功能
    • 当上层有写操作时,操作系统将数据写入PageCache,kernel space决定什么时候数据落盘
    • 当上层有读操作时,先从PageCache查找,找不到再去磁盘中读取
    image-20220818132537185 image-20220818132555472 image-20220818133110032

Consumer

消费方式

image-20220818180128357
  • pull模式:主动从broker拉取数据,但如果broker没有数据,消费者可能陷入循环(一直返回空数据)
  • push模式:由broker决定消息发送速率——很难适应所有消费者的消费速率

工作流程

image-20220818180306004
  • 总体流程:

    • Producer发送数据到broker的leader,follower同步数据
    • 消费者相互独立
    • 每个分区的数据,只能由消费者组里的一个消费者消费——将消费者组视为一个独立的消费者
    • 通过offset,记录消费者目前消费到的位置(老版本中offset存储在zk中,现在由系统主题保存到磁盘,减少网络通信)
  • 消费者组

    image-20220818180951411
    • 消费者组由多个consumer组成,同组的consumer的groupid相同
    • 消费者组之间互不影响
    • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • 如果向消费组中添加更多的消费者,最终超过主题分区数量,则有一部分消费者会闲 置,不接收任何消息
  • 消费者组初始化流程:

    image-20220818181232693 * coordinator:辅助消费者组的初始化和分区的分配,每个broker都有一个。节点选择=groupid的hashcode % 50( `__consumer_offsets`的分区数量)。消费者组下的消费者向该coordinator提交offset * 过程: * 组内每个消费者向该coordinator发送加入组的请求 * coordinator随机选出该消费者组的leader,将要消费的topic的情况(该topic的分区情况等信息)发给leader * leader制定消费计划,确定组内各个消费者分别消费的分区号(见后面的分区分配策略) * coordinator下发方案给这个组的所有消费者 * **coordinator会监控消费者的心跳和消费能力**
  • 消费者组消费流程:

    image-20220818181312085 * 消费者创建消费者网络客户端 * 首先调用sendFetches方法,协调每批次拉取的大小和拉取时间间隔,方法中包含三个参数 * 调用send方法发送请求,通过回调方法onSuccess将结果拉取到消息队列 * 反序列化、拦截器、数据处理

消费者API

  • 独立消费者:

    • 消费topic的数据(要求生产者发送了相应topic的数据)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      Properties properties = new Properties();

      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

      // 配置消费者组
      properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
      // 创建消费者对象
      KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
      // 注册要消费的主题(可以消费多个主题)
      ArrayList<String> topics = new ArrayList<>();
      topics.add("first");
      kafkaConsumer.subscribe(topics);

      // 拉取数据打印
      while (true) {
      // 设置 1s 中消费一批数据
      ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
      // 打印消费到的数据
      for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
      System.out.println(consumerRecord);
      }
      }
    • 消费某个主题的某个分区(结合之前的生产者partition)

      1
      2
      3
      4
      // 消费某个主题的某个分区数据
      ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
      topicPartitions.add(new TopicPartition("first", 0));
      kafkaConsumer.assign(topicPartitions);
  • 消费者组:复制一份基础消费者的代码,同时启动即可启动同一个消费者组中的两个消费者

  • 注意,如果想要重复消费已经消费过的数据,要改groupid并且properties.setProperty("auto.offset.reset", "earliest");,或者停止当前的消费者线程,重新运行时代码里主动修改offset。因为原来的groupid的offset已经变了!参考kafka常见问题如果想消费已经被消费过的数据

分区的分配和再平衡

  • 消费者组中包含多个消费者,topic包含多个分区——哪个consumer消费哪个partition的数据?

  • 四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky

  • 配置参数partition.assignment.strategy修改分区的分配策略

  • 默认策略是Range + CooperativeSticky——可以同时使用多个分区分配策略

    image-20220818201226095

Range 以及再平衡

image-20220818200731688
  • Range是对一个topic的
  • 对同一个topic的分区按序号排序,对消费者按字母顺序排序
    • 如果有7个分区,3 个消费者,排序后分区为:0,1,2,3,4,5,6,消费者为C0,C1,C2,除不尽则前面的消费者消费的分区数目增加
    • 即,partitions数/consumer数,决定每个消费者消费几个分区。如果除不尽,前面的消费者将会多消费1个分区
  • 当topic数目增多,C0消费者的分区消费数目,会比其他消费者明显更多,容易产生数据倾斜
  • 再平衡:如果停止C0,则会重新按照range方式分配——平衡后,C1消费原来的0、1、3、4号分区数据,C2消费者原来的消费2、5、6号分区数据,新的数据分区后,C1消费0、1、2、3、4号分区,C2消费4、5、6号分区

RoundRobin 以及再平衡

image-20220818201400592
  • 把所有partition和组内所有consumer列出,按照hashcode排序,通过轮询算法分配partition到各个消费者
  • 再分配:如果C0挂掉,数据轮询内容为0、3、6号分区,平衡后C1消费0、1、4、6号分区数据,C2消费者消费2、3、5号分区数据

Sticky 以及再平衡

  • 执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,节省大量的开销
  • 与Range相似,但同样的例子里,C0消费者虽然消费三个分区,分区号是随机的

Offset

维护位置

  • consumer默认将offset保存在Kafka一个内置的topic中:__consumer_offsets

  • __consumer_offsets采用key-value存储数据

    • key是group.id+topic+分区号,value是当前offset的值
    • 每隔一段时间,kafka会对这个topic进行compact,group.id+topic+分区号保留最新数据
  • 如果要查看这个topic,则配置文件config/consumer.properties中添加配置 exclude.internal.topics=false, 默认true,表示不能消费系统主题

    1
    $ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

自动提交offset

image-20220818203713056 image-20220818204432639
1
2
3
// 自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 是否自动提交 offset
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 提交 offset 的时间周期 1000ms,默认 5s

手动提交offset

  • commitSync同步提交:阻塞当前线程,发送提交offset请求,一直到提交成功,并且会自动失败重试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 手动提交offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    ...
    // 拉取数据打印
    while (true) {
    // 设置 1s 中消费一批数据
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    // 打印消费到的数据
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    System.out.println(consumerRecord);
    }
    // 同步提交 offset
    kafkaConsumer.commitSync();
    }
  • commitAsync异步提交:发送完提交offset请求后,就开始拉取下一批数据,没有失败重试机制,有可能提交失败——修改为kafkaConsumer.commitAsync();即可

指定offset消费

image-20220818204645671
  • 当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量(例如数据已删除)

  • 参数:auto.offset.reset = earliest | latest | none,默认是latest

    • earliest:自动将偏移量重置为最早的偏移量,–from-beginning
    • none:未找到消费者组的先前偏移量,则抛出异常
  • 指定任意offset位置开始消费——要先获得消费者对应的分区!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 配置消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    // 创建消费者对象
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    // 注册要消费的主题(可以消费多个主题)
    List<String> topics = new ArrayList<>();
    topics.add("first");
    kafkaConsumer.subscribe(topics);

    // 指定位置消费
    Set<TopicPartition> assignment = new HashSet<>();
    while (assignment.size() == 0) {
    kafkaConsumer.poll(Duration.ofSeconds(1));
    // 获取消费者分区分配方案(有了分区分配信息才能开始消费)
    assignment = kafkaConsumer.assignment();
    }
    // 遍历所有分区,并指定 offset 从 5 的位置开始消费
    for (TopicPartition tp: assignment) {
    kafkaConsumer.seek(tp, 5);
    }

重新按照时间消费(指定时间消费)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 获得分区分配方案
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>(); // 每个分区对应的一天前的时间
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 24 * 3600 * 1000);
}
// 一天前,每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null) {
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}

漏消费和重复消费

  • 重复消费:已经消费了数据,但是offset没提交(自动提交offset引起)

    image-20220818203246832

  • 漏消费:先提交offset后消费,有可能会造成数据的漏消费(offset被提交,但数据还在内存中未落盘,刚好消费者线程被kill,此时offset已经提交但是数据未处理,这部分内存中的数据丢失,下次启动是broker显示已经offset已经移动,但实际上消费者没有这个数据)

    image-20220818203300367

消费者事务

  • 完成Consumer端的精准一次性消费(不漏消费也不重复消费),需要Kafka消费端将消费过程和提交offset过程做原子绑定——需要将Kafka的offset保存到支持事务的自定义介质(如 MySQL)
  • 下游消费者必须支持事务,才能做到精确的一次性消费

数据积压(提高消费者吞吐量)

  • 如果消费能力不足,考虑增加Topic的分区数,同时提升消费组的消费者数量,消费者数 = 分区数
  • 下游的数据处理不及时:提高每批次拉取的数量