Kafka
# 阻塞队列
BlockingQueue
- 解决线程通信的问题
- 阻塞方法:put、take
生产者消费者模式
- 生产者:产生数据的线程
- 消费者:使用数据的线程
实现类
- ArrayBlockingQueue
- LinkedBlockQueue
- PriorityBlockingQueue,SynchronousQueue,DelayQueue等
示例
- 生产者线程
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 消费者线程
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果
# Kafka入门
- Kafka是一个分布式的流媒体平台;
- 应用:消息系统、日志收集、用户行为追踪、流失处理。
Kafka是一个消息队列,放里面放数据叫生产者,取数据叫消费者。
一个消息中间件,队列不单单只有一个,然后给每个队列取个名字,叫topic。
然后生产者,消费者就到指定的消息队列中存取数据就可以了。
为了提高一个队列的吞吐量,Kafka会把topic分区
实际上,生产者消费者是到指定topic,指定分区中存取数据
一台Kafka服务器叫做Broker
一个话题中的多个分区,可能存在不同的服务器上
Kafka将数据存放在不同的分区上,同时也会将分区备份,存在不同的broker上。备份分区只用做备份,不做读写。当某个broker挂了,那么就从其它broker中的分区作为主分区。
消费者
消费者可以有多个,形成一个消费者组。比如本来一个消费者要消费三个分区,那么可以通过一个消费者组,让三个消费者分别消费一个分区。
使用消息队列不可能是单机的(必然是分布式or集群)
Kafka会将partition以消息日志的方式(落磁盘)存储起来,通过 顺序访问IO和缓存(等到一定的量或时间)才真正把数据写到磁盘上,来提高速度。
Kafka会将数据写到partition,单个partition的写入是有顺序的。如果要保证全局有序,那只能写入一个partition中。如果要消费也有序,消费者也只能有一个。
凡是分布式就无法避免网络抖动/机器宕机等问题的发生,很有可能消费者A读取了数据,还没来得及消费,就挂掉了。Zookeeper发现消费者A挂了,让消费者B去消费原本消费者A的分区,等消费者A重连的时候,发现已经重复消费同一条数据了。(各种各样的情况,消费者超时等等都有可能…)
如果业务上不允许重复消费的问题,最好消费者那端做业务上的校验(如果已经消费过了,就不消费了)
# Kafka为什么这么快
顺序写
虽然说Kafka中的数据持久到到磁盘中,磁盘读写速度比不上内存读写速度。
都知道完成一次磁盘io,需要经过寻道、旋转和读写数据。
因为Kafka中某一个分区内的数据是有序的,队列FIFO,所以可以采用顺序写的方式进行持久化。
Kafka采用顺序写的方式,来省去寻道和旋转产生的时间,从而提高写入磁盘的速度。
每一个分区(Partition)对应多个物理文件(Segment)。
零拷贝
消费者读取磁盘中的数据,使用传统的IO模型,需要拷贝四次
零拷贝方式,减少用户态和内核态直接的切换
PageCache
生产者生产消息到Broker时,先写入到page cache中,page cache存在内存当中,但不受jvm的GC管理,这样可以避免每次生产消费都需要磁盘io。
网络模型
采用基于池化思想,避免为每个连接创建线程,连接完成后将业务处理交给线程池处理。
基于 IO 复用模型,多个连接共用同一个阻塞对象,不用等待所有的连接。
批量与压缩
生产者向Broker发送消息是按照批量来发送,假设带宽是10MB/s,那么发送一个10MB的消息比发送1MB的消息10次要快。
多数情况下,系统的瓶颈不在磁盘IO,而是在网络IO。
分区并发
Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。
每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。每次文件操作也是直接操作segment。
为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件。
# 夺命连环问
# 说说你对Kafka的理解
Kafka是一个流式数据处理平台,具有消息系统的能力,也有实时流式数据处理分析能能力,只是我们通常把它当做消息队列系统来用。
主要由三个方面组成,ZooKeeper、Kafka核心和存储。
Kafka一般作为分布式系统来用,所以需要每个Kafka服务器启动时需要将自己注册到ZooKeeper中,由ZooKeeper来统一管理。
然后Kafka本身,有消息、话题、生产者、消费者、分片、Broker、组等等。
最后是存储方面,用来持久化存储Kafka的数据,都会以日志的形式最终存入到磁盘中。
# 消息队列模型知道吗?kafka是怎么做到支持这两种模型的?
点对点
:消息只能被一个消费者消费,消费完后消息删除。
发布订阅
:相当于广播模式,消息可以被所有消费者消费。消费者组,由多个消费者组成,一个组内只会由一个消费者去消费一个分区的消息。
# 能说说kafka通信过程原理吗?
- 首先Kafka Broker启动时,将自己注册到ZooKeeper中;
- 生产者根据配置的地址连接到指定的Broker,建立TCP连接;
- 发送消息;
- 消费者和协调者Broker创建TCP连接;
- 开始消费消息。
# 发送消息时如何选择分区的?
- 轮询,按照顺序消息依次发送到不同的分区
- 随机,随机发送到某个分区
如果消息指定key,那么会根据消息的key进行hash,然后对partition分区数量取模,决定落在哪个分区上,所以,对于相同key的消息来说,总是会发送到同一个分区上,也是我们常说的消息分区有序性。
很常见的场景就是我们希望下单、支付消息有顺序,这样以订单ID作为key发送消息就达到了分区有序性的目的。
如果没有指定key,会执行默认的轮询负载均衡策略,比如第一条消息落在P0,第二条消息落在P1,然后第三条又在P1。
除此之外,对于一些特定的业务场景和需求,还可以通过实现
Partitioner
接口,重写configure
和partition
方法来达到自定义分区的效果。
# 为什么需要分区?有什么好处?
如果说不分区的话,我们发消息写数据都只能保存到一个节点上,这样的话就算这个服务器节点性能再好最终也支撑不住。
分区带来了负载均衡和横向扩展的能力。
发送消息时可以根据分区的数量存在不同的Broker上,提升了并发读写消息能力。
# 详细说说消费者组和消费者重平衡?
一般来说,消费者数量和所有主题分区的数量保持一致最好,消费者组可以让Kafka支持传统的两种消息队列模型。
Kafka中有一个协调者来 完成分区的分配,而重平衡Rebalance就是指的有新消费者加入的情况,比如刚开始我们只有消费者A在消费消息,过了一段时间消费者B和C加入了,这时候分区就需要重新分配。
# 分区分配策略?
Range
对同一个主题的分区,排序优先均匀分给前面的消费者,排在前面的消费者获得的消息 >= 后面的消费者。
RoundRobin
轮询,按顺序以此分配给消费者。
Sticky
之前这个分区 分给消费者,那么下次还是尽量分给他,避免频繁的销毁创建连接。
# 消息传递语义剖析
1)首先当 Producer 向 Broker 发送数据后,会进行 commit,如果commit成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。
2)在 Kafka 0.11.0.0 之前, 如果 Producer 没有收到消息 commit 的响应结果,它只能重新发送消息,确保消息已经被正确的传输到 Broker,重新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。为了实现这个, Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。也支持了类似事务语义来保证将消息发送到多个 Topic 分区中,保证所有消息要么都写入成功,要么都失败,这个主要用在 Topic 之间的 exactly once 语义。
其中启用幂等传递的方法配置:enable.idempotence = true。
启用事务支持的方法配置:设置属性 transcational.id = "指定值"。
3)从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。
4)如果 Consumer 消费消息完成后, 再更新 Offset,如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。
# Kafka三次消息传递
1)Producer 端发送消息给 Kafka Broker 端。
2)Kafka Broker 将消息进行同步并持久化数据。
3)Consumer 端从Kafka Broker 将消息拉取并进行消费。
Kafka 只对「已提交」的消息做「最大限度的持久化保证不丢失」。
已提交的数据指Producer段发送的消息已经有N个Broker成功收到了,而且至少有一个Broker存活,那么就能保证消息持久化保证不丢失。
# 消息丢失场景
# Producer端
1)首先我们要知道一点就是Producer 端是直接与 Broker 中的 Leader Partition 交互的,所以在 Producer 端初始化中就需要通过 Partitioner 分区器从 Kafka 集群中获取到相关 Topic 对应的 Leader Partition 的元数据。
2)待获取到 Leader Partition 的元数据后直接将消息发送过去。
3)Kafka Broker 对应的 Leader Partition 收到消息会先写入 Page Cache,定时刷盘进行持久化(顺序写入磁盘)。
4)Follower Partition 拉取 Leader Partition 的消息并保持同 Leader Partition 数据一致,待消息拉取完毕后需要给 Leader Partition 回复 ACK 确认消息。
5)待 Kafka Leader 与 Follower Partition 同步完数据并收到所有 ISR 中的 Replica 副本的 ACK 后,Leader Partition 会给 Producer 回复 ACK 确认消息。
丢失情况可能发生在Producer端异步发送消息给Broker
- 网络原因: 由于网络抖动导致数据没有发送过去
- **数据原因:**消息体太大超出Broker接收范围
通过配置参数来确认消息是否发送成功
# Broker端
KafkaBroker 集群接收到数据后会将数据进行持久化存储到磁盘,为了提高吞吐量和性能,采用的是「异步批量刷盘的策略」,也就是说按照一定的消息量和间隔时间进行刷盘。首先会将数据存储到 「PageCache」 中,至于什么时候将 Cache 中的数据刷盘是由「操作系统」根据自己的策略决定或者调用 fsync 命令进行强制刷盘,如果此时 Broker 宕机 Crash 掉,且选举了一个落后 Leader Partition 很多的 Follower Partition 成为新的 Leader Partition,那么落后的消息数据就会丢失。
# Consumer端
1)Consumer 拉取数据之前跟Producer 发送数据一样, 需要通过订阅关系获取到集群元数据,找到相关 Topic 对应的 Leader Partition 的元数据。
2)然后 Consumer 通过 Pull 模式主动的去 Kafka 集群中拉取消息。
3)在这个过程中,有个消费者组的概念(不了解的可以看上面链接文章),多个 Consumer 可以组成一个消费者组即 Consumer Group,每个消费者组都有一个Group-Id。同一个 Consumer Group 中的 Consumer 可以消费同一个 Topic 下不同分区的数据,但是不会出现多个 Consumer 去消费同一个分区的数据。
4)拉取到消息后进行业务逻辑处理,待处理完成后,会进行 ACK 确认,即提交 Offset 消费位移进度记录。
5)最后 Offset 会被保存到 Kafka Broker 集群中的 __consumer_offsets 这个 Topic 中,且每个 Consumer 保存自己的 Offset 进度。
Consumer拉取后消息最终是要提交Offset,那么这里就可能会丢失数据
可能使用的「自动提交 Offset 方式」
拉取消息后「先提交 Offset,后处理消息」,如果此时处理消息的时候异常宕机,由于 Offset 已经提交了, 待 Consumer 重启后,会从之前已提交的 Offset 下一个位置重新开始消费, 之前未处理完成的消息不会被再次处理,对于该 Consumer 来说消息就丢失了。
拉取消息后「先处理消息,在进行提交 Offset」, 如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况,这里只能业务自己保证幂等性。
# 解决方案
发送的调用方式改为异步方式
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
//intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);}
然后就是修改一些配置项
producer端发送消息重试次数,retries 设为最大值;
重试时间retry.backoff.ms 设为300ms;
unclean.leader.election.enable ,false表示不要在ISR列表外的follow选举leader,因为那些副本落后原来leader很多;
replication.factor,设置分区副本的个数,>=3;
min.insync.replicas,ISR个数,也是“已提交”个数;
enable.auto.commit = false,设置消费端手动移位offset,业务自己保证幂等性。