Kafka:概述、架构、生产者
概述
一般的过程:
前端埋点,记录用户购买的行为数据(浏览、点赞、收藏、评论)
采集log到日志服务器,通过Flume采集数据,落盘,送入Kafka集群缓冲,Hadoop集群按照自己的处理速度从Kafka中消费数据
发布、订阅:消息的发布者不将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息(如上例,日志分为浏览、点赞、收藏,订阅者按照自己的速度选择性接收消息)
定义:开源的分布式事件流平台,不仅是基于发布/订阅模式的消息队列,还可以用于流分析、数据集成和关键任务应用
消息队列
消息队列:
类型:
- JavaEE中主要采用ActiveMQ、 RabbitMQ、RocketMQ
- 大数据场景下使用Kafka
应用:缓存/消峰、解耦和异步通信
缓冲、消峰:控制数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致
解耦:数据源很多,数据接收很多。如果没有消息队列,则每个数据源都要实现不同接收方的接口(n*m)。消息队列使得数据源、数据接收方只实现相同的接口即可(n+m)
异步通信:用户把一个消息放入队列,系统不立即处理它,在空闲了才处理它(以用户注册+发送短信为例)
消息队列的模式:
点对点:生产者将数据发送到队列,消费者主动拉取数据,拉取后向队列反馈收到,队列删除数据——只有一个topic
发布/订阅:
多个topic(主题,如浏览、点赞、收藏等),不同topic的队列不同
消费者消费数据后不删除数据——消费者相互独立,都可以消费数据
消息队列自己决定是否删除数据
架构
- 一个主题的数据(上图为100T)分为多个块(分区,partition),每个partition是一个有序的队列,让多个Kafka服务器分别存储(broker0-broker2),服务器可以存储多个topic的分区
- 消费者组(CG):由多个消费者组成,每个消费者消费不同分区的数据,一个分区只能由一个组内消费者消费,但一个分区可以由多个组的消费者消费(上图,partition0只发给了group2的第一个消费者,而partition2分别发给了不同group的第二个消费者)
- 副本:每个分区都有多个副本(如上图的TopicA-Partition0,broker0中有一个,broker2中有一个),多个副本分为leader和follower
- leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader
- follower:每个分区多个副本中的“从”,实时从leader同步数据。leader故障时,某个follower成为新的leader
- zookeeper:存储Kafka服务器节点的运行状态,存储各个分区的副本中谁是leader(kafka逐渐在抛弃zookeeper,2.8.0以后可以选择使用kraft)
部署
kafka中,broker代码用scala编写,producer、consumer用java编写
集群规划:三个zk,三个kafka
文件配置server.properties:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#broker 的全局唯一编号,不能重复,只能是数字。
0 =
#处理网络请求的线程数量
3 =
#用来处理磁盘 IO 的线程数量
8 =
#发送套接字的缓冲区大小
102400 =
#接收套接字的缓冲区大小
102400 =
#请求套接字的缓冲区大小
104857600 =
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
/opt/module/kafka/datas =
#topic 在当前 broker 上的分区个数
1 =
#用来恢复和清理 data 下数据的线程数量
1 =
# 每个 topic 创建时的副本数,默认时 1 个副本
1 =
#segment 文件保留的最长时间,超时将被删除
168 =
#每个 segment 文件的大小,默认最大 1G
1073741824 =
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
300000 =
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka =hadoop103 和 hadoop104 修改配置文件使得broker.id为1、2——broker.id在整个集群中唯一
集群启动脚本:
kf.sh start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac集群停止时,需要Kafka所有节点进程全部停止后再停止Zookeeper集群
命令行操作
* `kafka-topics.sh --bootstrap-server hadoop102:9092 --list` * `kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first` * `kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first` * `kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3`——分区数只能增加,不能减少bin/kafka-topics.sh
:bin/kafka-console-producer.sh
:- 发送消息:
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
- 发送消息:
bin/kafka-console-consumer.sh
:- 消费first主题的数据:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
- 从主题first中读取所有数据,包括历史数据:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
- 消费first主题的数据:
生产者
producer消息发送流程
- 两个线程
- main线程中创建一个producer对象,并创建一个双端队列RecordAccumulator,线程将消息发送给RecordAccumulator
- Sender线程从RecordAccumulator中拉取消息发送到Kafka Broker
- 调用send方法发送数据,数据发送过程中,可能会经过拦截器,并通过序列化器序列化消息(Java的序列化太重,因此使用自带的序列化器)
- 分区器:由分区器决定消息应当分配到哪个partition,每个partition对应RecordAccumulator的一个队列(队列大小为32M)
- 数据的一个batch达到16k或者linger.ms到达时间,则sender线程拉取一个batch
- sender线程拉取数据,创建请求。针对每个Broker,都有一个InFlightRequest请求队列存放在NetWorkClient,默认每个队列缓存5个请求,请求通过Selector异步发送到Kafka集群
- 数据发送到kafka集群后,集群同步副本,返回应答(三个级别,0、1、-1)
异步发送 API
- 异步发送时,外部数据发送到RecordAccumulator,不管RecordAccumulator中前一批数据是否已经发送到Kafka broker。同步发送时,则需要等待上一批数据完全发送到Kafka并收到相应ack后,当前数据才能发到RecordAccumulator
无回调异步
创建生产者,发送数据,关闭资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 生产者配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 连接集群
// 指定key,value的序列化器:key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test", " " + i));
}
// 5. 关闭资源
kafkaProducer.close();
带回调异步
- 发往队列RecordAccumulator,队列返回信息
- 回调:
- 调用方把一个需要被回调的方法作为参数,传递给要调用的方法,被调用的方法执行完成后,通过该参数返回来调用此方法,作为参数传递的该方法就称作回调方法
- 场景:A模块的某些方法,需要在B模块满足某些条件下来主动通知并调用A模块的这些方法
- 参考:JAVA回调机制(CallBack)详解
1 | KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); |
同步发送
1 | for (int i = 0; i < 5; i++) { |
生产者分区(分区器)
数据->拦截器->序列化->分区器->RecordAccumulator
将数据分块存储在多个Broker,根据Broker的条件(计算资源、存储资源等)设置控制分区的任务,实现负载均衡
生产者的分区策略:
默认的分区器:DefaultPartitioner
前四个,指明partition的情况下,直接将指明的值作为分区值
1
2
3kafkaProducer.send(new ProducerRecord<>("test", 1, "", "test " + i), new Callback() {
...
}第五个,没有指明partition值但有key,将key的hash与topic的partition数取余得到partition值
第六个,既没有partition值又没有key值,Kafka采用Sticky Partition(黏性分区器),随机选择一个分区,尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
自定义分区器:
实现Partitioner接口,重写partition()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class PartitionerSelf implements Partitioner {
/**
* @param topic: 主题
* @param key: key
* @param keyBytes: key序列化的结果
* @param value: value
* @param valueBytes: value序列化的结果
* @param cluster: 集群元数据
* @return
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 创建 partition
int partition;
// 判断消息是否包含 atguigu
if (msgValue.contains("thomas")) {
partition = 2;
} else {
partition = 3;
}
// 返回分区号
return partition;
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}载入,调用
1
2
3
4
5
6properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "producer.PartitionerSelf");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test", "thomas " + i), new Callback() {
...
}
提高吞吐量
1 | // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd |
数据可靠性
ack应答级别:
0:生产者发送过来的数据,不需要等数据落盘,立刻应答
1:生产者发送过来的数据,Leader收到数据后应答——leader应答后同步前,可能挂掉,新的leader没有该信心
-1:生产者发送过来的数据,Leader和ISR队列的所有节点收齐数据后应答
Leader维护一个动态的in-sync replica set(ISR)——和Leader保持同步的Follower+Leader集合((leader:0,isr:0,1,2))
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定,默认30s)
如果分区副本设置为1个,或ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,此时和ack=1的效果相同(leader:0,isr:0)
数据完全可靠条件:ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于对可靠性要求比较高的场景
1
2
3
4// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
数据去重
数据传递语义:
- 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 最多一次(At Most Once)= ACK级别设置为0
- 精确一次:对于一些非常重要的信息,比如和钱相关的数据,数据既不能重复也不丢失
幂等:
Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证不重复
精确一次(Exactly Once) = 幂等性 + 至少一次
重复数据判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条——单分区,单会话内不重复
- PID:Kafka每次重启都会分配一个新的Producer ID
- Partition:分区号
- Sequence Number:单调自增
开启幂等:参数enable.idempotence默认为true,false为关闭
生产者事务:开启事务,必须开启幂等
1
2
3
4
5
6
7
8
9
10// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("test","test " + i));
}
// int i = 1 / 0; // 测试事务提交失败
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
kafkaProducer.close();
}
数据有序、乱序
- 单分区内,有序(有条件); 多分区,分区与分区间无序
- 单分区有序要求:
- max.in.flight.requests.per.connection设置为1(没有开启幂等时)
- max.in.flight.requests.per.connection设置小于等于5(开启幂等时)——启用幂等后,kafka服务端缓存producer发来的最近5个request的元数据,因此可以保证最近5个request的数据是有序的