参考文档:
Kafka 设计与原理详解:Kafka深度解析:Kafka controller重设计:controller详解:producer详解:kafka log 存储:Kafka如何实现producer幂等性:为什么需要消息系统
1)解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束2)冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕3)扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可4)灵活性 & 峰值处理能力: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃5)可恢复性: 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理6)顺序保证: 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)7)缓冲: 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况8)异步通信: 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们一、kafka是什么
Kafka是一种分布式的,基于发布/订阅的消息系统二、kafka的特性- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
三、kafka应用场景
> 日志收集:
一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等 > 消息系统:解耦和生产者和消费者、缓存消息等 > 流式处理:比如spark streaming和storm四、Kafka架构
- Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
- Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition.
- Producer:负责发布消息到Kafka broker
- Consumer:消息消费者,向Kafka broker读取消息的客户端
- Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
Topic & Partition
物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件
这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围1)发送消息的时候可以指定partion,修改文件partition.class2)Kafka提供两种策略去删除旧数据 一是基于时间,二是基于partition文件大小 例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。3)Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offset由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障Partition的数据文件
Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性: offset MessageSize data其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致
我们来思考一下,如果一个partition只有一个数据文件会怎么样? 1)新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。 2)查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引
数据文件的分段Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中为数据文件建索引数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。 position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了
Producer
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)Kafka delivery guarantee > At most once 消息可能会丢,但绝不会重复传输 > At least one 消息绝不会丢,但可能会重复传输 > Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。 当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。Consumer
Kafka提供了两种Consumer API
> High Level Consumer API 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义 > Low Level Consumer API (Kafka诡异的称之为Simple Consumer API,实际上非常复杂)Consumer Rebalance的算法:
1) 将目标Topic下的所有Partirtion排序,存于PT 2) 对某Consumer Group下所有Consumer排序,存于CG,第i个Consumer记为Ci 3) N=size(PT)/size(CG),向上取整 4) 解除Ci对原来分配的Partition的消费权(i从0开始) 5) 将第i∗N到(i+1)∗N−1个Partition分配给Ci目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发 Consumer Group的Rebalance,具体启动流程如下: 1) High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id] 2) 在/consumers/[consumer group]/ids上注册Watch 3) 在/brokers/ids上注册Watch 4) 如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上也创建Watch 5) 强制自己在其Consumer Group内启动Rebalance流程 每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。Consumer Rebalance的缺陷:
Herd effect:任何broker或者consumer的增减都会触发所有的consumer的rebalance Split Brain:每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的consumer都认为rebalance已经完成了,但实际上可能并非如此。Metadata
metadata信息里包括了每个topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等
对于集群中的每一个broker都保存着相同的完整的整个集群的metadata信息。Kafka客户端从任一broker都可以获取到需要的metadata信息Metadata的存储 在每个Broker的KafkaServer对象中都会创建MetadataCache组件, 负责缓存所有的metadata信息。所在文件: core/src/main/scala/kafka/server/MetadataCache.scala。所有的metadata信息存储在map里, key是topic, value又是一个map, 其中key是parition id, value是PartitionStateInfoController
负责管理和协调Kafka集群
1) UpdateMetadataRequest 更新元数据请求 topic分区状态经常会发生变更(比如leader重新选举了或副本集合变化了等)。由于当前clients只能与分区的leader broker进行交互,那么一旦发生变更,controller会将最新的元数据广播给所有存活的broker。具体方式就是给所有broker发送UpdateMetadataRequest请求2) CreateTopics创建topic请求。当前不管是通过API方式、脚本方式抑或是CreateTopics请求方式来创建topic,做法几乎都是在Zookeeper的/brokers/topics下创建znode来触发创建逻辑,而controller会监听该path下的变更来执行真正的“创建topic”逻辑3) DeleteTopics 删除topic请求。和CreateTopics类似,也是通过创建Zookeeper下的/admin/delete_topics/<topic>节点来触发删除topic,controller执行真正的逻辑4) 分区重分配 即kafka-reassign-partitions脚本做的事情。同样是与Zookeeper结合使用,脚本写入/admin/reassign_partitions节点来触发,controller负责按照方案分配分区5) Preferred leader分配 preferred leader选举当前有两种触发方式: > 自动触发(auto.leader.rebalance.enable = true) > kafka-preferred-replica-election脚本触发 两者“玩法”相同,向Zookeeper的/admin/preferred_replica_election写数据,controller提取数据执行preferred leader分配6) 分区扩展 即增加topic分区数。标准做法也是通过kafka-reassign-partitions脚本完成,不过用户可直接往Zookeeper中写数据来实现,比如直接把新增分区的副本集合写入到/brokers/topics/<topic>下,然后controller会为你自动地选出leader并增加分区7) 集群扩展 新增broker时Zookeeper中/brokers/ids下会新增znode,controller自动完成服务发现的工作8) broker崩溃 同样地,controller通过Zookeeper可实时侦测broker状态。一旦有broker挂掉了,controller可立即感知并为受影响分区选举新的leader9) ControlledShutdown broker除了崩溃,还能“优雅”地退出。broker一旦自行终止,controller会接收到一个ControlledShudownRequest请求,然后controller会妥善处理该请求并执行各种收尾工作10)Controller leader选举 controller必然要提供自己的leader选举以防这个全局唯一的组件崩溃宕机导致服务中断。这个功能也是通过Zookeeper的帮助实现的 如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令 controller与broker的通讯 当前controller启动时会为集群中所有broker创建一个各自的连接 假设你的集群中有100台broker,那么controller启动时会创建100个Socket连接(也包括与它自己的连接!)。当前新版本的Kafka统一使用了NetworkClient类来建模底层的网络连接(有兴趣研究源码的可以去看下这个类,它主要依赖于Java NIO的Selector)。Controller会为每个连接都创建一个对应的请求发送线程,专门负责给对应的broker发送请求。也就是说,如果还是那100台broker,那么controller启动时还会创建100个RequestSendThread线程。当前的设计中Controller只能给broker发送三类请求,它们是: 1)UpdateMetadataRequest:更新元数据2)LeaderAndIsrRequest:创建分区、副本以及完成必要的leader和/或follower角色的工作3)StopReplicaRequest:停止副本请求,还可能删除分区副本 Controller通常都是发送请求给broker的,只有上面谈到的controller 10大功能中的ControlledShutdownRequest请求是例外:这个请求是待关闭的broker通过RPC发送给controller的,即它的方向是反的。另外这个请求还有一个特别之处就是其他所有功能或是请求都是通过Zookeeper间接与controller交互的,只有它是直接与controller进行交互的。