Kafka (1)消息队列概述、生产者

Kafka:概述、架构、生产者

概述

  • 一般的过程:

    • 前端埋点,记录用户购买的行为数据(浏览、点赞、收藏、评论)

    • 采集log到日志服务器,通过Flume采集数据,落盘,送入Kafka集群缓冲,Hadoop集群按照自己的处理速度从Kafka中消费数据

      image-20220816110822333

  • 发布、订阅:消息的发布者不将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息(如上例,日志分为浏览、点赞、收藏,订阅者按照自己的速度选择性接收消息)

  • 定义:开源的分布式事件流平台,不仅是基于发布/订阅模式的消息队列,还可以用于流分析、数据集成和关键任务应用

消息队列

  • 消息队列:

    • 类型:

      • JavaEE中主要采用ActiveMQ、 RabbitMQ、RocketMQ
      • 大数据场景下使用Kafka
    • 应用:缓存/消峰、解耦和异步通信

      • 缓冲、消峰:控制数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致

        image-20220816111504446
      • 解耦:数据源很多,数据接收很多。如果没有消息队列,则每个数据源都要实现不同接收方的接口(n*m)。消息队列使得数据源、数据接收方只实现相同的接口即可(n+m)

        image-20220816111520299
      • 异步通信:用户把一个消息放入队列,系统不立即处理它,在空闲了才处理它(以用户注册+发送短信为例)

        image-20220816111701219
  • 消息队列的模式:

    • 点对点:生产者将数据发送到队列,消费者主动拉取数据,拉取后向队列反馈收到,队列删除数据——只有一个topic

      image-20220816111951446
    • 发布/订阅:

      • 多个topic(主题,如浏览、点赞、收藏等),不同topic的队列不同

      • 消费者消费数据后不删除数据——消费者相互独立,都可以消费数据

      • 消息队列自己决定是否删除数据

        image-20220816112007908

架构

image-20220816112303252

  • 一个主题的数据(上图为100T)分为多个块(分区,partition),每个partition是一个有序的队列,让多个Kafka服务器分别存储(broker0-broker2),服务器可以存储多个topic的分区
  • 消费者组(CG):由多个消费者组成,每个消费者消费不同分区的数据,一个分区只能由一个组内消费者消费,但一个分区可以由多个组的消费者消费(上图,partition0只发给了group2的第一个消费者,而partition2分别发给了不同group的第二个消费者)
  • 副本:每个分区都有多个副本(如上图的TopicA-Partition0,broker0中有一个,broker2中有一个),多个副本分为leader和follower
    • leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader
    • follower:每个分区多个副本中的“从”,实时从leader同步数据。leader故障时,某个follower成为新的leader
  • zookeeper:存储Kafka服务器节点的运行状态,存储各个分区的副本中谁是leader(kafka逐渐在抛弃zookeeper,2.8.0以后可以选择使用kraft)

部署

  • kafka中,broker代码用scala编写,producer、consumer用java编写

  • 集群规划:三个zk,三个kafka

    image-20220816113609319

  • 文件配置server.properties:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=0
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
  • hadoop103 和 hadoop104 修改配置文件使得broker.id为1、2——broker.id在整个集群中唯一

  • 集群启动脚本:kf.sh start

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #! /bin/bash
    case $1 in
    "start"){
    for i in hadoop102 hadoop103 hadoop104
    do
    echo " --------启动 $i Kafka-------"
    ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
    };;
    "stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
    echo " --------停止 $i Kafka-------"
    ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
    };;
    esac
  • 集群停止时,需要Kafka所有节点进程全部停止后再停止Zookeeper集群

命令行操作

image-20220816114651760

  • bin/kafka-topics.sh

    image-20220816123151603 * `kafka-topics.sh --bootstrap-server hadoop102:9092 --list` * `kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first` * `kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first` * `kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3`——分区数只能增加,不能减少
  • bin/kafka-console-producer.sh

    image-20220816123415961
    • 发送消息:bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
  • bin/kafka-console-consumer.sh

    image-20220816123655902
    • 消费first主题的数据:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    • 从主题first中读取所有数据,包括历史数据:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

生产者

producer消息发送流程

image-20220816133654678

  • 两个线程
    • main线程中创建一个producer对象,并创建一个双端队列RecordAccumulator,线程将消息发送给RecordAccumulator
    • Sender线程从RecordAccumulator中拉取消息发送到Kafka Broker
  • 调用send方法发送数据,数据发送过程中,可能会经过拦截器,并通过序列化器序列化消息(Java的序列化太重,因此使用自带的序列化器)
  • 分区器:由分区器决定消息应当分配到哪个partition,每个partition对应RecordAccumulator的一个队列(队列大小为32M)
  • 数据的一个batch达到16k或者linger.ms到达时间,则sender线程拉取一个batch
  • sender线程拉取数据,创建请求。针对每个Broker,都有一个InFlightRequest请求队列存放在NetWorkClient,默认每个队列缓存5个请求,请求通过Selector异步发送到Kafka集群
  • 数据发送到kafka集群后,集群同步副本,返回应答(三个级别,0、1、-1)

异步发送 API

  • 异步发送时,外部数据发送到RecordAccumulator,不管RecordAccumulator中前一批数据是否已经发送到Kafka broker。同步发送时,则需要等待上一批数据完全发送到Kafka并收到相应ack后,当前数据才能发到RecordAccumulator

无回调异步

  • 创建生产者,发送数据,关闭资源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 生产者配置
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 连接集群
    // 指定key,value的序列化器:key.serializer,value.serializer
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建 kafka 生产者对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

    // 发送消息
    for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("test", " " + i));
    }
    // 5. 关闭资源
    kafkaProducer.close();

带回调异步

image-20220817095012621
  • 发往队列RecordAccumulator,队列返回信息
  • 回调:
    • 调用方把一个需要被回调的方法作为参数,传递给要调用的方法,被调用的方法执行完成后,通过该参数返回来调用此方法,作为参数传递的该方法就称作回调方法
    • 场景:A模块的某些方法,需要在B模块满足某些条件下来主动通知并调用A模块的这些方法
    • 参考:JAVA回调机制(CallBack)详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 5; i++) {
// 实现接口Callback的onCompletion 用于回调
kafkaProducer.send(new ProducerRecord<>("test", " " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
exception.printStackTrace();
}
}
});
Thread.sleep(2);
}

同步发送

1
2
3
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test", " " + i)).get();
}

生产者分区(分区器)

  • 数据->拦截器->序列化->分区器->RecordAccumulator

  • 将数据分块存储在多个Broker,根据Broker的条件(计算资源、存储资源等)设置控制分区的任务,实现负载均衡

    image-20220817101357839
  • 生产者的分区策略:

    • 默认的分区器:DefaultPartitioner

      image-20220817102131273

      • 前四个,指明partition的情况下,直接将指明的值作为分区值

        1
        2
        3
        kafkaProducer.send(new ProducerRecord<>("test", 1, "", "test " + i), new Callback() {
        ...
        }
      • 第五个,没有指明partition值但有key,将key的hash与topic的partition数取余得到partition值

      • 第六个,既没有partition值又没有key值,Kafka采用Sticky Partition(黏性分区器),随机选择一个分区,尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)

    • 自定义分区器:

      • 实现Partitioner接口,重写partition()

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        public class PartitionerSelf implements Partitioner {

        /**
        * @param topic: 主题
        * @param key: key
        * @param keyBytes: key序列化的结果
        * @param value: value
        * @param valueBytes: value序列化的结果
        * @param cluster: 集群元数据
        * @return
        */
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 atguigu
        if (msgValue.contains("thomas")) {
        partition = 2;
        } else {
        partition = 3;
        }
        // 返回分区号
        return partition;
        }

        @Override
        public void close() {

        }

        @Override
        public void configure(Map<String, ?> map) {

        }
        }
      • 载入,调用

        1
        2
        3
        4
        5
        6
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "producer.PartitionerSelf");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
        kafkaProducer.send(new ProducerRecord<>("test", "thomas " + i), new Callback() {
        ...
        }

提高吞吐量

1
2
3
4
5
6
7
8
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0 ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

数据可靠性

  • ack应答级别:

    • 0:生产者发送过来的数据,不需要等数据落盘,立刻应答

    • 1:生产者发送过来的数据,Leader收到数据后应答——leader应答后同步前,可能挂掉,新的leader没有该信心

    • -1:生产者发送过来的数据,Leader和ISR队列的所有节点收齐数据后应答

      • Leader维护一个动态的in-sync replica set(ISR)——和Leader保持同步的Follower+Leader集合((leader:0,isr:0,1,2))

      • 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定,默认30s)

      • 如果分区副本设置为1个,或ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,此时和ack=1的效果相同(leader:0,isr:0)

        image-20220817122028038

  • 数据完全可靠条件:ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

  • 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于对可靠性要求比较高的场景

    1
    2
    3
    4
    // 设置 acks
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    // 重试次数 retries,默认是 int 最大值,2147483647
    properties.put(ProducerConfig.RETRIES_CONFIG, 3);

数据去重

  • 数据传递语义:

    • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
    • 最多一次(At Most Once)= ACK级别设置为0
    • 精确一次:对于一些非常重要的信息,比如和钱相关的数据,数据既不能重复也不丢失
  • 幂等:

    • Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证不重复

    • 精确一次(Exactly Once) = 幂等性 + 至少一次

    • 重复数据判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条——单分区,单会话内不重复

      • PID:Kafka每次重启都会分配一个新的Producer ID
      • Partition:分区号
      • Sequence Number:单调自增
      image-20220817122405925
    • 开启幂等:参数enable.idempotence默认为true,false为关闭

  • 生产者事务:开启事务,必须开启幂等

    image-20220817123201764

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 1 初始化事务
    void initTransactions();
    // 2 开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
    // 4 提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5 放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // 设置事务 id(必须),事务 id 任意起名
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

    // 初始化事务
    kafkaProducer.initTransactions();
    // 开启事务
    kafkaProducer.beginTransaction();
    try {
    // 4. 调用 send 方法,发送消息
    for (int i = 0; i < 5; i++) {
    // 发送消息
    kafkaProducer.send(new ProducerRecord<>("test","test " + i));
    }
    // int i = 1 / 0; // 测试事务提交失败
    // 提交事务
    kafkaProducer.commitTransaction();
    } catch (Exception e) {
    // 终止事务
    kafkaProducer.abortTransaction();
    } finally {
    kafkaProducer.close();
    }

数据有序、乱序

  • 单分区内,有序(有条件); 多分区,分区与分区间无序
  • 单分区有序要求:
    • max.in.flight.requests.per.connection设置为1(没有开启幂等时)
    • max.in.flight.requests.per.connection设置小于等于5(开启幂等时)——启用幂等后,kafka服务端缓存producer发来的最近5个request的元数据,因此可以保证最近5个request的数据是有序的