学习的代码版本是0.8.2.1
原理
组件图
存储
目录结构
tmp$ tree kafka-logs/ kafka-logs/ |-- huying_test-0 | |-- 00000000000000000000.index | `-- 00000000000000000000.log |-- huying_test_ultimate-0 | |-- 00000000000000000000.index | `-- 00000000000000000000.log |-- huying_test_ultimate-1 | |-- 00000000000000000000.index | `-- 00000000000000000000.log |-- huying_test_ultimate-2 | |-- 00000000000000000000.index | `-- 00000000000000000000.log |-- huying_test_ultimate-3 | |-- 00000000000000000000.index | `-- 00000000000000000000.log |-- recovery-point-offset-checkpoint `-- replication-offset-checkpoint
一个目录代表一个(topic, partition)组合,在代码中是一个Log对象,每个Log包含多个LogSegment,一个LogSegment包含一个log文件(FileMessageSet)和一个index(OffsetIndex)
Log
+----------------+ |offset 8(bytes) | +----------------+ |messageSize 4 | +----------------+ |CRC 4 | +----------------+ |Magic 1 | +----------------+ |attributes 1 | +----------------+ |key 4 | +----------------+ |size 4 | +----------------+ |content | +----------------+ +----------------+ |offset 8(bytes) | +----------------+ |messageSize 4 | +----------------+ |CRC 4 | +----------------+ |Magic 1 | +----------------+ |attributes 1 | +----------------+ |key 4 | +----------------+ |size 4 | +----------------+ |content | +----------------+
OffsetIndex
+------------------+ |relativeOffset 4 | +------------------+ |positionInByte 4 | +------------------+
LogManager startup
1.遍历所有的日志目录,如果有.kafka_cleanshutdown就说明是干净的shutdown,跳过当前log的recovery
2.读取recovery-point-offset-checkpoint文件,返回(topic, partition)->offset 映射。其中文件的第一行是版本号,第二行是记录条数,后面是映射内容
3.计算出log目录和对应的recovery point,并用来初始化Log对象。
4.清理工作,清理掉.kafka_cleanshutdown
5.启动三个定时任务,kafka-log-retention, kafka-log-flusher, kafka-recovery-point-checkpoint
6.retention是删除时间过久,体积过大的log文件。flusher是根据时间间隔刷盘,调用FileChannel.force。recovery-point-checkpoint是把内存里的(topic, partition)->offset checkpoint结构刷盘。
7.启动LogCleaner
new Log()
1.删除掉.deleted和.cleaned后缀的文件
2.碰到.swap说明在swap中途server挂掉,这里的swap过程仅仅是一个rename。这种情况对于index的swap直接删除后面从log文件重建,如果是log的swap就先删index再通过rename的方式swap回log
3.遇到.index索引文件,如果没有对应的.log文件直接删除
4.通过.log文件生成LogSegment对象,如果没有索引文件就运行LogSegment.recover重建索引
5.重建索引的过程是以message为单位遍历LogSegment,遍历过程中如果累加的大小超过了index interval就在索引里面记录一下当前消息偏移量(会转化成相对偏移)和.log文件里面的字节为单位的位置。这个过程中会顺便把log文件和index文件末尾的可能因为crash产生的多余的字节给清理掉。
6.开始根据recovery point做恢复,这里碰到cleanShutdown文件就会跳过恢复
7.遍历从recovery point到末尾的LogSegment,调用LogSegment.recover,如果过程中发现多余的字符说明LogSegment非法,就把非法的一直到末尾的全部删掉。
8.最后对所有的index做一遍sanityCheck
Log.append
1.参数是MessageSet一次append一批消息
2.分析校验,拿到是否递增,压缩codec,以及校验大小,crc,干掉多余的字节
3.后面的所有过程都加上Log级别的锁,如果需要设置偏移量,就遍历每条消息,设置上偏移量
4.再做一次消息大小检查,因为前面可能重新压缩过消息
5.如果当前LogSegment大小加上要append的大小超过上限就滚动到新的segment
6.用当前Log的logEndOffset生成新的log file和index file,之前的末尾index文件做一下裁剪
7.用新的log file和index file生成新的LogSegment并加入Log里面的列表,并且把recoveryPoint到之前的logEndOffset的segment加入刷盘队列,刷完了以后recoveryPoint增长到logEndOffset
8.如果增长的字节量达到了需要做索引的长度就在index里面append一个entry,然后FileMessageSet.append
8.如果增长的字节量达到了需要做索引的长度就在index里面append一个entry,然后FileMessageSet.append
9.如果append以后没刷盘的字节过多就刷一下盘
10.一个有趣的点是这里并没有有mapped file但是在索引的地方用了,而rocketmq是主log文件也用了,个人理解是只有读也比较多的时候用这个才有价值,只是一次append然后一次读意义就不是很大
10.一个有趣的点是这里并没有有mapped file但是在索引的地方用了,而rocketmq是主log文件也用了,个人理解是只有读也比较多的时候用这个才有价值,只是一次append然后一次读意义就不是很大
Log.read
1.参数有一个startOffset,首先定位到offset仅仅比这个低的entry,如果定位不到就直接异常
2.调用LogSegment.read,首先要找到log文件里面对应startOffset的第一个合法的position
3.查找过程先通过index查找offset对应的position,查找的过程是标准二分查找,因为用了mmap所以都是在内存里面找。index因为需要经常的随机读和append操作,所以做了mmap
4.调用FileMessageSet.searchFor,在log文件里找大于offset的合法message的position
5.用同样的方法计算出end offset对应position,在算出需要读取的字节数
6.通过这些信息,返回一个FileMessageSet,这里还没有实际读取,后面会提取出byte message返回
LogCleaner
1.如果对一个消息存在key相同但是offset更高的消息,那么就清理掉这个offset低的消息
2.先找最脏的Log,找的方法是先读取cleaner-offset-checkpoint里面存了每个log的最后清理offset,然后每个log拿最后清理的offset到active offset所有segments的size和log size算一个比率,然后取比率最大的
3.开始清理Log,先遍历所有脏的LogSegment,构建一个内存中的OffsetMap(index),key是消息key,value是消息offset,这个map后面用来判断消息是否是重复的
4.把所有的LogSegment分个组,每个组的字节大小总量尽量接近于某个指定值。尽量把一个组压缩成一个新的Segment,这样做让新的Segment体积尽量平均
5.拿出每个组的第一个segment以它的名字创建log和index对应的.cleaned文件
6.对于组里的每个LogSegment,过滤掉重复的和空的message,其余的全部写入那个新的目标LogSegment
7.新的LogSegment刷盘,然后把旧的segment list全部删除,最后把.cleaned名字改成.log文件
8.为啥会有这种清理策略呢?其实是为了用topic保存位点做准备工作,保存位点的场景key比较固定只需要最后状态,所以用这招很好。如果是交易订单的场景就不合适了
网络协议
request and response
1.首先解析2字节的requestId
2.根据requestId选择kafka.api包里面对应的request类来处理,序列化反序列化协议自包含在对应的request类里面
3.有一个correlationId做request和response的关联
3.有一个correlationId做request和response的关联
4.拿ProducerRequest做例子
+----------------------------+ | requestId 2 | +----------------------------+ | versionId 2 | +----------------------------+ | correlationId 4 | +----------------------------+ | clientId | +----------------------------+ | requiredAcks 2 | +----------------------------+ | ackTimeoutMs 4 | +----------------------------+ | topicCount 4 | +----------------------------+ | +-------------------------+| | | topic || | +-------------------------+| | | partitionCount 4 || | +-------------------------+| | | +----------------------+|| | | | partition 4 ||| | | +----------------------+|| | | | messageSetSize 4 ||| | | +----------------------+|| | | | messageSetContent ||| | | | ||| | | +----------------------+|| | +-------------------------+| +----------------------------+
Message
+--------------+ |CRC 4 | +--------------+ |Magic 1 | +--------------+ |attributes 1 | +--------------+ |key 4 | +--------------+ |size 4 | +--------------+ |content | +--------------+
消息发送
1.发送分同步发送和异步发送,参见Producer.send,最终都会调用DefaultEventHandler.handle
2.发送消息之前如果topic metadata的缓存更新周期到了先随机找一台server拉一下数据更新brokerPartitionInfo
3.如果这条消息没key就和以前发送的partition相同或者随机选一个分区,如果有key就通过partitioner计算分区号,发送的broker选择leader broker
4.发送失败的消息会进行重试,有条件remainingRetries > 0 && outstandingProduceRequests.size > 0进行判断
5.ProducerSendThread里面有对异步消息批量发送的机制,通过数量多和超时两个条件判断是否该发送消息。可以选择是否要ack,有个配置项request.required.acks
6.server端KafkaApis.handleProducerOrOffsetCommitRequest,首先如果是offset commit request就转换成对offset topic发送消息的producer request
7.调用Partition.appendMessagesToLeader,把message保存到本地的文件系统。调用replicaManager.unblockDelayedFetchRequests把block住的fetch request处理一下符合条件的给response。delay fetch需要满足的条件在DelayedFetch.isSatisfied,主要是积攒一定量的消息
8.根据ack方式的设置,或者直接给response,或者生成DelayedProduce放到ProducerRequestPurgatory里面等后面满足条件的时候再ack。条件在DelayedProduce里面,主要是检查所有replica的end offset
9.保序和线程模型的问题,服务端是多个线程共同处理请求的,所以如果想保序的多条消息同时被服务端处理肯定乱序,但因为有ack机制,发送端发一条ack之后再下一条,这样在同一时间点就只能存在一条需要保序的消息就不会乱序了。
5.ProducerSendThread里面有对异步消息批量发送的机制,通过数量多和超时两个条件判断是否该发送消息。可以选择是否要ack,有个配置项request.required.acks
6.server端KafkaApis.handleProducerOrOffsetCommitRequest,首先如果是offset commit request就转换成对offset topic发送消息的producer request
7.调用Partition.appendMessagesToLeader,把message保存到本地的文件系统。调用replicaManager.unblockDelayedFetchRequests把block住的fetch request处理一下符合条件的给response。delay fetch需要满足的条件在DelayedFetch.isSatisfied,主要是积攒一定量的消息
8.根据ack方式的设置,或者直接给response,或者生成DelayedProduce放到ProducerRequestPurgatory里面等后面满足条件的时候再ack。条件在DelayedProduce里面,主要是检查所有replica的end offset
9.保序和线程模型的问题,服务端是多个线程共同处理请求的,所以如果想保序的多条消息同时被服务端处理肯定乱序,但因为有ack机制,发送端发一条ack之后再下一条,这样在同一时间点就只能存在一条需要保序的消息就不会乱序了。
接收消息
1.ZookeeperConsumerConnector主要处理consumer和zk的交互,/consumers/[group_id]/ids[consumer_id] -> topic1,...topicN 这个节点是consumer注册的临时节点并且存放了这个consumer订阅的所有topic,consumer会监听自己group里面其它consumer的变更,感觉是个坑,当一个group里面consumer很多的时候zk监听会非常的多
2.consumer会监听/brokers/[0...N],感觉也是坑。/consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id 记录partition的owner,rebalance的时候会重建
3./consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value 记录消费位点,还是感觉用专用的topic记录比较好
4.consumer启动api,Consumer.create(config)实际上启动了ZookeeperConsumerConnector,启动过程中如果配置了autoCommitEnable会启动一个scheduler定期扫描consumer对应的topic,partition并且提交位点
5. 调用ZookeeperConsumerConnector.createMessageStreams,首先在zk consume group的path下面注册client的临时节点,然后初始化3个zk listener ZKRebalancerListener,ZKSessionExpireListener,ZKTopicPartitionChangeListener
6. ZKSessionExpireListener不监听具体节点,在重连的时候重新注册client临时节点,触发loadBalancerListener.syncedRebalance。ZKTopicPartitionChangeListener监听/brokers/topics/huying_test,发生变化的时候触发rebalance,loadBalancerListener.rebalanceEventTriggered
7.ZKRebalancerListener监听/consumers/console-consumer-84505/ids节点,ZKRebalancerListener启动的时候会启动一个定时任务检查是否发生过rebalance事件,如果发生过就调用syncedRebalance。
8.rebalance的过程先检查broker,如果没有broker就注册一个listener。为了防止rebalance失败导致重复拉取,停止当前consumer上的ConsumerFetcher,清掉线程消费queue和KafkaStream里面的消息。接下来删除zk上的partition ownership信息,和内存里的注册
9.接下来调用PartitionAssignor.assign给当前consumer重新分配partition,有两种分配方式RoundRobinAssignor和RangeAssignor,RoundRobinAssignor先拿到所有的partition和consumer thread,然后分别排序再循环consumer thread去拿partition,但是只返回本consumer对应thread id对应的partition。RangeAssignor也比较简单,比如有10个partition,有5个consumer,当前consumer排序第二,那么就返回第三,第四个partition
10.对新的partition ownership创建临时节点并写入consumer thread id相关信息,针对partition调用ConsumerFetcherManager.startConnections,过程中会启动一个线程找zk拉取partition的leader信息,这个线程还会做一件事,addFetcherForPartitions启动ConsumerFetcherThread,ConsumerFetcherThread才是这正干活拉消息的
11.发消息使用的是SimpleConsumer,SimpleConsumer就是一个简单的blocking的rpc api封装。这里感觉kafka使用内存还是很残暴的,很多可以使用对象池和内存池的地方都没有做,new BoundedByteBufferSend,ByteBuffer.allocate(size)这样的代码随处可见。
12.KafkaStream的数据来自PartitionTopicInfo,有一个比较曲折的关联关系
2.consumer会监听/brokers/[0...N],感觉也是坑。/consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id 记录partition的owner,rebalance的时候会重建
3./consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value 记录消费位点,还是感觉用专用的topic记录比较好
4.consumer启动api,Consumer.create(config)实际上启动了ZookeeperConsumerConnector,启动过程中如果配置了autoCommitEnable会启动一个scheduler定期扫描consumer对应的topic,partition并且提交位点
5. 调用ZookeeperConsumerConnector.createMessageStreams,首先在zk consume group的path下面注册client的临时节点,然后初始化3个zk listener ZKRebalancerListener,ZKSessionExpireListener,ZKTopicPartitionChangeListener
6. ZKSessionExpireListener不监听具体节点,在重连的时候重新注册client临时节点,触发loadBalancerListener.syncedRebalance。ZKTopicPartitionChangeListener监听/brokers/topics/huying_test,发生变化的时候触发rebalance,loadBalancerListener.rebalanceEventTriggered
7.ZKRebalancerListener监听/consumers/console-consumer-84505/ids节点,ZKRebalancerListener启动的时候会启动一个定时任务检查是否发生过rebalance事件,如果发生过就调用syncedRebalance。
8.rebalance的过程先检查broker,如果没有broker就注册一个listener。为了防止rebalance失败导致重复拉取,停止当前consumer上的ConsumerFetcher,清掉线程消费queue和KafkaStream里面的消息。接下来删除zk上的partition ownership信息,和内存里的注册
9.接下来调用PartitionAssignor.assign给当前consumer重新分配partition,有两种分配方式RoundRobinAssignor和RangeAssignor,RoundRobinAssignor先拿到所有的partition和consumer thread,然后分别排序再循环consumer thread去拿partition,但是只返回本consumer对应thread id对应的partition。RangeAssignor也比较简单,比如有10个partition,有5个consumer,当前consumer排序第二,那么就返回第三,第四个partition
10.对新的partition ownership创建临时节点并写入consumer thread id相关信息,针对partition调用ConsumerFetcherManager.startConnections,过程中会启动一个线程找zk拉取partition的leader信息,这个线程还会做一件事,addFetcherForPartitions启动ConsumerFetcherThread,ConsumerFetcherThread才是这正干活拉消息的
11.发消息使用的是SimpleConsumer,SimpleConsumer就是一个简单的blocking的rpc api封装。这里感觉kafka使用内存还是很残暴的,很多可以使用对象池和内存池的地方都没有做,new BoundedByteBufferSend,ByteBuffer.allocate(size)这样的代码随处可见。
12.KafkaStream的数据来自PartitionTopicInfo,有一个比较曲折的关联关系
同步消息
1.在ReplicaManager.makeFollowers里面启动ReplicaFetcherThread找leader拉取消息
2.通过SimpleConsumer(一个简单的rpc封装)发送FetchRequest,拿到response之后append到本地的log,然后更新一下本地的highWaterMark
创建topic
1.客户端分配partition和replica到不同的broker上面。分配方式是对partition的第一份replica按round-robin的方式开始点是随机的一个broker,第二份和后面的replica会加上一个shift,下面是源码中给的例子
broker-0 broker-1 broker-2 broker-3 broker-4 p0 p1 p2 p3 p4 (1st replica) p5 p6 p7 p8 p9 (1st replica) p4 p0 p1 p2 p3 (2nd replica) p8 p9 p5 p6 p7 (2nd replica) p3 p4 p0 p1 p2 (3nd replica) p7 p8 p9 p5 p6 (3nd replica)
2.把topic信息和topic,partition和broker的关系写到zk上去
3.server端调用的发起点是监听了zk变更的TopicChangeListener,首先通过zk回调的数据和server缓存的数据差算出新增的topic
4.KafkaController.onNewTopicCreation,首先给topic注册AddPartitionListener,监听的路径是/brokers/topics/topicXXX,然后主动触发一次onNewPartitionCreation事件
5.进入PartitionStateMachine和ReplicaStateMachine的handleStateChange更新状态到NewPartition
6. PartitionStateMachine更新状态到OnlinePartition,在zk上初始化partition的leader和in sync replica的信息,zk path /brokers/topics/huying_test1/partitions/0/state data {"controller_epoch":6,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} 这里有段注释需要注意,写zk可能会失败,当前机器可能会因为长gc失去和zk的session,所以后面catch一个zk node exist exception
7.对成为这个partition的leader的broker发送一个LeaderAndIsrRequest
8.负责管理发送的ControllerChannelManager会保存对每个broker的连接,发送也是先加到一个queue然后后面有一个线程异步发送
9. ReplicaStateMachine更新状态到OnlinePartition
10.broker收到LeaderAndIsrRequest调用ReplicaManager.becomeLeaderOrFollower,先判断controllerEpoch,如果比自己的小就丢弃这个请求,遍历请求中的每个partition,如果请求中对应partition的leaderEpoch小于当前partition的leaderEpoch就忽略当前partition
11.算出请求中partition对应的leader和follower的变更,对于变成leader的partition首先停止对这些partition的拉取job。
12.对每个partition调用Partition.makeLeader,对每个replica如果之前没有创建就创建一个,创建的过程是如果是远程的就只创建对象放在内存里,如果是local的就先创建一个Log,然后读取对应的highWatermark数据,最后生成Replica对象
13.highWatermark对应的replication-offset-checkpoint文件,和recovery point类似,但是recovery point是记录的刷盘的点,high water mark记录的是写入commit的点,所有的replica最小的commit offset就是high water mark。这里可能会增加high water mark
14.如果请求过来的topic是__consumer_offsets,那就启动OffsetManager的异步读。这个topic是用来管理所有的consumer的进度的,这样避免了把消费进度存zk上面影响扩展性。这个异步读会一直读取__consumer_offsets并把消息解码成消费进度放入缓存
15.对于需要变成follower的partition,如果是leader就调用Partition.makeFoller,首先如果本地的replica没有那就创建对应的Log,然后填充和清理一些本地内存结构
16.停止对这些成为follower的partition的拉取线程,把这些partition的Log截断到highWaterMark的位置,并启动对那些成为leader的partition的拉取线程
17.第一次创建topic的最后会启动highWaterMark的checkPoint线程,这个线程定期刷所有replica的checkPoint到磁盘
扩容
1.使用partition reassign tool
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }2.PartitionsReassignedListener监听/admin/reassign_partitions,PartitionsReassignedListener会触发ReassignedPartitionsIsrChangeListener监听/brokers/topics/huying_test1/partitions/0/state
3.触发点是PartitionsReassignedListener,对每个partition先判断新分配的分区是否已经分配好,是否有对应的broker是挂掉的,这两种情况直接退出。然后注册监听器ReassignedPartitionsIsrChangeListener,再后进入onPartitionReassignment
4.KafkaController.onPartitionReassignment 初始状态新分配的replica只要有not in sync的状态areReplicasInIsr的判断就是false,首先把旧的replica和新分配的replica的并集更新一下内存和zk
5.每个partition也有一个leaderEpoch,用于ReplicaManager判断是否是旧的leaderAndISRRequest,这个属性保存在zk的原因应该是controller需要知道leaderEpoch信息
6.先更新一下zk上的leaderEpoch然后给并集所有的broker发送一个LeaderAndIsr来调整leader
7.把差集的replica设置成NewReplica的状态,这个过程也会发送LeaderAndIsr请求
8.这个过程可能会触发ReassignedPartitionsIsrChangeListener,会先判断这个partition是否结束了reassignment,然后判断是否所有reassign的replica都已经追上(isr),如果追上就再次进入onPartitionReassignment
9.把reassignedReplicas在replicaStateMachine里面转换成Online的状态,在内存里面把原来的并集替换成reassignedReplicas,如果leader不在reassignedReplicas里面通过partitionStateMachine发起一个新的partition leader选举,如果leader在的机器挂了也要重新选举,否则就只是发送请求更新一下leaderEpoch
10.把差集replica下线,下线是通过replicaStateMachine做的,依次做了几个状态变更OfflineReplica->ReplicaDeletionStarted->ReplicaDeletionSuccessful->NonExistentReplica过程中可能会停止replica fetch的线程和删掉Log
11.先把reassignedReplicas更新一下zk,然后更新一下/admin/reassign_partitions的内容,取消掉对应的ReassignedPartitionsIsrChangeListener,最后给每个broker发送一个更新metadata的请求
12.感觉这个流程主要做的是replica调整,并没有涉及到停写,和consumer消费完流程的控制,过程中可能会出现消息乱序。update,这里还真不需要,kafka一个topic多搞几个队列,扩容的时候每个机器assign的队列少一点就实现自动扩容了,缺点是文件多了,写性能可能会下降。rocketmq因为是双写而不是isr,所以扩容就需要停写和消费完。
高可用
high water mark
1.high water mark通常是一个partition所有replica的endOffset的最小值,也是同步提交模式情况下集群commit的位置
2.读的时候客户端最多只能读到highWatermark的位置
3.一个follower挂掉重启的时候首先扔掉它自己highWatermark之后的数据,然后开始追赶leader
4.leader挂掉的会重新选,新leader把自己的endOffset当做新的highWatermark,然后让其它的replica开始追赶
in sync replica
1.Partition会启动一个超时线程,调用Partition.maybeShrinkIsr,检查所有isr的end offset和最后更新时间,差距过大的就踢掉
2.处理follower的fetch request的时候可能会触发Partition.updateLeaderHWAndMaybeExpandIsr,重新检查这个replica的各种条件,可能的话就加回到isr
partition leader election
1.PartitionLeaderSelector.OfflinePartitionLeaderSelector是用得最多的leader selector,选leader的逻辑是从controller context里面拿出live isr和live replica,优先选择live isr的第一个,如果没有就选择live replica的第一个,否则就抛异常
2.更新zk path /brokers/topics/huying_test1/partitions/0/state
3.给这个partition的每个replica broker发一个请求LeaderAndIsrRequest,其中leader和follower的指令稍有区别。然后给所有活着的broker发一个请求UpdateMetadataRequest。这里的请求都是异步带callback(多数情况是null)的
4.发送请求如果失败会重试,终止重试的流程是zk那边发起,导致当前发送线程被关闭
5.接收到请求的broker开始ReplicaManager.makeLeaders, makeFollowers相关的流程
6.处理UpdateMetadataRequest请求比较简单,更新一下cache数据就ok了
7.触发点,KafkaController.onBrokerStartup, KafkaController.onBrokerFailure分别对应broker启动的时候和broker挂掉的时候,KafkaController.onPreferredReplicaElection,使用了工具
bin/kafka-preferred-replica-election.sh --zookeeper localhost:12913/kafka --path-to-json-file topicPartitionList.json
这个工具的作用是重新平衡partition leader的位置,因为broker的各种failover可能partition的leader会飘到一台机器上面去,这个工具可以使leader重新平衡。工作的原理是监听/admin/preferred_replica_election节点,触发一次partitionStateMachine.handleStateChanges,每个partition都选第一个replica当leader,由于之前分配topic,partition的时候已经在broker上面做了平衡所以这个时候leader的分配也是平衡的
controller leader election
1.controller是kafka全局的leader,controller负责监听zookeeper然后分析状态变更发送命令到其它的broker
2.broker启动的时候开始监听/controller节点,这里注意一个zk的bug(session失效可能过了一段时间临时节点才会删除,如果碰到node exist exception就返回可能会都选不上) LeaderChangeListener可能会触发KafkaController.onControllerFailover或者KafkaController.onControllerResignation
3.onControllerFailover首先会读取zk上的epoch,然后+1乐观锁方式更新zk,epoch用来标识正确的leader,当新leader产生旧leader没挂掉的时候可以作废掉旧leader的命令,broker里面关注epoch的manager保存一份epoch,接到比自己大的epoch就设定成新的
4.监听几个zk path,/admin/reassign_partitions,/admin/preferred_replica_election,/brokers/topics,/admin/delete_topics,/brokers/ids,/brokers/topics/topicXXX
5.初始化controllerContext相关的内存结构和一些manager,并给其它的broker发一个metadata更新的命令
6.如果配了自动平衡partition leader,就启动一个检查并自动平衡partition leader的scheduler,调用checkAndTriggerPartitionRebalance,不平衡率高的时候触发onPreferredReplicaElection
7.onControllerResignation执行相反的过程,取消注册之前的几个zk listener并且关闭几个manager
3.onControllerFailover首先会读取zk上的epoch,然后+1乐观锁方式更新zk,epoch用来标识正确的leader,当新leader产生旧leader没挂掉的时候可以作废掉旧leader的命令,broker里面关注epoch的manager保存一份epoch,接到比自己大的epoch就设定成新的
4.监听几个zk path,/admin/reassign_partitions,/admin/preferred_replica_election,/brokers/topics,/admin/delete_topics,/brokers/ids,/brokers/topics/topicXXX
5.初始化controllerContext相关的内存结构和一些manager,并给其它的broker发一个metadata更新的命令
6.如果配了自动平衡partition leader,就启动一个检查并自动平衡partition leader的scheduler,调用checkAndTriggerPartitionRebalance,不平衡率高的时候触发onPreferredReplicaElection
7.onControllerResignation执行相反的过程,取消注册之前的几个zk listener并且关闭几个manager
broker启动和关闭
1.broker启动的时候注册SessionExpireListener,这里调用的是zkClient.subscribeStateChanges(sessionExpireListener)而不是监听zk临时节点,感觉碰到机器假死会少了删临时节点的手段。然后注册临时节点/brokers/ids/xxx,这个临时节点会被controller的BrokerChangeListener监听
2.KafkaController.onBrokerStartup 首先给新启动的broker发送metadata
2.KafkaController.onBrokerStartup 首先给新启动的broker发送metadata
3.给新broker上的所有replica触发OnlineReplica的状态变更,会触发broker上面的replicaManager.becomeLeaderOrFollower,具体的逻辑和前面创建topic里面描述的相同
4.触发partitionStateMachine的状态变更,可能会导致leader重新选举,同时也会触发broker上面的replicaManager.becomeLeaderOrFollower
5.如果在新的broker上有partition reassignment,就调用onPartitionReassignment
6.broker关闭的逻辑类似,也是触发partitionStateMachine和replicaStateMachine的状态变化,稍有不同的是那些leader在这台broker上的partition要先触发一个Offline再触发一个Online
rebalance
脑裂问题和惊群效应
1.因为网络延时和不同的客户端连的zk server可能不同,不同客户端看到的zk状态可能不完全一致导致rebalance的时候的判断可能不太准确zk different client view,导致rebalance因zk ownership冲突而失败。有一个bug,[KAFKA-242]比较快的连续调用ConsumerConnector.createMessageStreams,可能会导致位点不正确前置导致丢消息,看起来发生在前一次ConsumerConnector.createMessageStreams导致rebalance结束前又调用了一次ConsumerConnector.createMessageStreams
2.一个broker或者consumer的变化可能会导致所有的consumer的rebalance。0.8版本用consumer thread id排序然后重新分配消费,中间插入一个consumer可能会导致很多consumer的分配变化,而且每次rebalance都是要关闭消费者(包括关连接和清queue),相当重。0.9 group reassignment 也是用的roundrobin 但是rebalance不会直接关闭连接
kafka 0.9的解决方案
1.增加了kaka.coordinator这个新包处理consumer的协调问题,GroupCoordinator是broker端处理所有group api请求的核心类。0.9包的客户端只有java版的
2.当一个broker变成__consumer_offsets的某个partition的leader的时候会异步的读取__consumer_offsets里面的消费位点和consume group metadata并缓存起来
3.KafkaApis里面ListGroupsKey,DescribeGroupsKey很简单就是取一下cache,HeartbeatKey也简单正常情况就是给个response然后schedule下一次heartbeat
4.SyncGroupKey,如果coordinator处于Dead,PreparingRebalance状态都是简单的不做处理,处于Stable也仅仅只是触发一下下一次心跳,处于AwaitingSync状态只处理leader client(加入group的第一个client)的请求,把当前的groupMetadata和assignment数据序列化成一条消息发到topic里面去。发送失败重新触发rebalance,发送成功本地保存一下assignment并且把状态转成stable。这里的回调看起来比较复杂,但实际上只是把返回状态码处理的逻辑传来传去
5.JoinGroupKey,加入group前先有一个简单的协议验证,协议在ConsumerProtocol比较简单,有一些版本号,topic,partition之类的信息。实际处理逻辑在GroupCoordinator.doJoinGroup进行,如果coordinator处于PreparingRebalance状态,收到请求增加或者更新一下group member然后触发下一次rebalance,处于AwaitingSync状态,如果碰到不在group里面的client那接添加member触发下一次rebalance,如果是已知的client先进行一次protocol比较也就是比较metadata,如果没有变化那就加入成功给response,否则就认为metadata变化了会更新metadata触发下一次rebalance。处于Stable状态,碰到新client或者client metadata变化都会促发rebalance。如果coordinator处于PreparingRebalance状态在最后会触发joinPurgatory.checkAndComplete把之前delay的操作刷掉。触发rebalance是往joinPurgatory里面丢一个DelayedJoin操作,DelayedJoin在tryComplete时会检查是否group里面的client都加入了,如果是会触发GroupCoordinator.onCompleteJoin,onCompleteJoin先移除失败的client,然后把generationId +1,选择一个protocol(取交集的第一个),把状态改成AwaitingSync,最后给每个client返回JoinGroupResult(有generationId,leaderId和选择的protocol)
6.LeaveGroupKey,先移除心跳配置,然后根据coordinator状态可能触发一次rebalance
7.GroupCoordinatorKey,读取group coordinator信息,首先读取topic __consumer_offsets的metadata,根据groupId算出group metadata应该在哪个partition里面,读取对应partition的partitionMetadata里面的leader信息这个leader就是group coordinator。比较重要的问题就是谁是coordinator,GroupMetadataManager.isGroupLocal->ownedPartitions.add(offsetsPartition)->loadGroupsForPartition->handleGroupImmigration->handleLeaderAndIsrRequest,所以__consumer_offsets这个topic对应的partition的leader就是group coordinator
8.KafkaConsumer每次拉消息的时候会先确保拿到server端coordinator的信息,过程中可能会发送GroupCoordinatorRequest。检查一下consumer是否在group内,如果不在就发送JoinGroupRequest,拿到response之后会发送SyncGroupRequest,前面的response会告诉consumer是不是leader,如果consumer是leader,会先调用performAssignment(这部分逻辑在WorkerCoordinator源码在外面)进行partition-consumer分配,request里面会带上group assignment,这些做完以后就实际拉数据了,比较简单
9.consumer的heartbeat启动在AbstractCoordinator.HeartbeatTask.reset,也比较简单,heartbeat失败就标记coordinator挂掉了,碰到ILLEGAL_GENERATION设置状态rejoinNeeded,这个状态位会使得消费的时候先停住发送JoinGroupRequest
10.流程串起来,consumer首先给自己知道的broker发请求得知coordinator地址,consumer通过心跳保持和coordinator的联系,如果心跳返回IllegalGeneration就意味着要rebalance,下一次消费消息会block住并且发出JoinGroupRequest,收到响应后如果是leader consumer会进行partition分配,接下来发送SyncGroupRequest,leader的request会让coordinator进入stable状态,follower会从coordinator那里拿到partition assignment,这个做完以后才开始拉新的partition
11.coordinator是__consumer_offsets这个topic的partition所在的leader机器,每次partition leader选举的时候新leader会从本地的log文件读取到GroupMetadata做缓存,consumer挂掉的时候coordinator把它移出group并且触发rebalance,coordinator通过IllegalGeneration通知consumer发生rebalance并且只有收到所有的consumer的JoinGroupRequest之后coordinator才会给consumer响应JoinGroupResponse,结束rebalance。coordinator也会监听topic partition的变化,必要的时候触发rebalance。consumer join group的时候会被coordinator分配一个consumer id之后的每次HeartbeatRequest和OffsetCommitRequest请求都带consumer id,如果哪次heartbeat请求或者commit offset请求的consumer id不对,coordinator会返回一个UnknownConsumer。
4.SyncGroupKey,如果coordinator处于Dead,PreparingRebalance状态都是简单的不做处理,处于Stable也仅仅只是触发一下下一次心跳,处于AwaitingSync状态只处理leader client(加入group的第一个client)的请求,把当前的groupMetadata和assignment数据序列化成一条消息发到topic里面去。发送失败重新触发rebalance,发送成功本地保存一下assignment并且把状态转成stable。这里的回调看起来比较复杂,但实际上只是把返回状态码处理的逻辑传来传去
5.JoinGroupKey,加入group前先有一个简单的协议验证,协议在ConsumerProtocol比较简单,有一些版本号,topic,partition之类的信息。实际处理逻辑在GroupCoordinator.doJoinGroup进行,如果coordinator处于PreparingRebalance状态,收到请求增加或者更新一下group member然后触发下一次rebalance,处于AwaitingSync状态,如果碰到不在group里面的client那接添加member触发下一次rebalance,如果是已知的client先进行一次protocol比较也就是比较metadata,如果没有变化那就加入成功给response,否则就认为metadata变化了会更新metadata触发下一次rebalance。处于Stable状态,碰到新client或者client metadata变化都会促发rebalance。如果coordinator处于PreparingRebalance状态在最后会触发joinPurgatory.checkAndComplete把之前delay的操作刷掉。触发rebalance是往joinPurgatory里面丢一个DelayedJoin操作,DelayedJoin在tryComplete时会检查是否group里面的client都加入了,如果是会触发GroupCoordinator.onCompleteJoin,onCompleteJoin先移除失败的client,然后把generationId +1,选择一个protocol(取交集的第一个),把状态改成AwaitingSync,最后给每个client返回JoinGroupResult(有generationId,leaderId和选择的protocol)
6.LeaveGroupKey,先移除心跳配置,然后根据coordinator状态可能触发一次rebalance
7.GroupCoordinatorKey,读取group coordinator信息,首先读取topic __consumer_offsets的metadata,根据groupId算出group metadata应该在哪个partition里面,读取对应partition的partitionMetadata里面的leader信息这个leader就是group coordinator。比较重要的问题就是谁是coordinator,GroupMetadataManager.isGroupLocal->ownedPartitions.add(offsetsPartition)->loadGroupsForPartition->handleGroupImmigration->handleLeaderAndIsrRequest,所以__consumer_offsets这个topic对应的partition的leader就是group coordinator
8.KafkaConsumer每次拉消息的时候会先确保拿到server端coordinator的信息,过程中可能会发送GroupCoordinatorRequest。检查一下consumer是否在group内,如果不在就发送JoinGroupRequest,拿到response之后会发送SyncGroupRequest,前面的response会告诉consumer是不是leader,如果consumer是leader,会先调用performAssignment(这部分逻辑在WorkerCoordinator源码在外面)进行partition-consumer分配,request里面会带上group assignment,这些做完以后就实际拉数据了,比较简单
9.consumer的heartbeat启动在AbstractCoordinator.HeartbeatTask.reset,也比较简单,heartbeat失败就标记coordinator挂掉了,碰到ILLEGAL_GENERATION设置状态rejoinNeeded,这个状态位会使得消费的时候先停住发送JoinGroupRequest
10.流程串起来,consumer首先给自己知道的broker发请求得知coordinator地址,consumer通过心跳保持和coordinator的联系,如果心跳返回IllegalGeneration就意味着要rebalance,下一次消费消息会block住并且发出JoinGroupRequest,收到响应后如果是leader consumer会进行partition分配,接下来发送SyncGroupRequest,leader的request会让coordinator进入stable状态,follower会从coordinator那里拿到partition assignment,这个做完以后才开始拉新的partition
11.coordinator是__consumer_offsets这个topic的partition所在的leader机器,每次partition leader选举的时候新leader会从本地的log文件读取到GroupMetadata做缓存,consumer挂掉的时候coordinator把它移出group并且触发rebalance,coordinator通过IllegalGeneration通知consumer发生rebalance并且只有收到所有的consumer的JoinGroupRequest之后coordinator才会给consumer响应JoinGroupResponse,结束rebalance。coordinator也会监听topic partition的变化,必要的时候触发rebalance。consumer join group的时候会被coordinator分配一个consumer id之后的每次HeartbeatRequest和OffsetCommitRequest请求都带consumer id,如果哪次heartbeat请求或者commit offset请求的consumer id不对,coordinator会返回一个UnknownConsumer。
高性能
1.磁盘顺序写速度很快,pagecache的存在加速读写
2.攒消息成message set,网络上传大包,更大块的磁盘操作,内存连续性更好
3.协议固定,消息字节在broker,consumer,producer之间传输不需要修改
4.字节从pagecache到socket在linux可以通过sendfile优化
小技巧
1.SkimpyOffsetMap OffsetMap的开放式寻址实现,省内存,不能删除。
2.Throttler 一个小小的限流器,通过sleep限流。
3.多层结构的TimingWheel,解决了简单TimingWheel范围比较小的缺点。
3.多层结构的TimingWheel,解决了简单TimingWheel范围比较小的缺点。
ZK技巧
broker先通过zk选出一个leader,然后只有leader监听zk上面的partition变化,监听到变化以后leader会决定是否给其它broker发送命令,这种结构zk的压力就不会增长得太厉害
ZK结构
截了一个简单的,有些需要流程触发的节点没有
/consumers /consumers/console-consumer-84505 /consumers/console-consumer-84505/offsets /consumers/console-consumer-84505/offsets/huying_test /consumers/console-consumer-84505/offsets/huying_test/3 /consumers/console-consumer-84505/offsets/huying_test/2 /consumers/console-consumer-84505/offsets/huying_test/1 /consumers/console-consumer-84505/offsets/huying_test/0 /consumers/console-consumer-84505/owners /consumers/console-consumer-84505/owners/huying_test /consumers/console-consumer-84505/owners/huying_test/3 /consumers/console-consumer-84505/owners/huying_test/2 /consumers/console-consumer-84505/owners/huying_test/1 /consumers/console-consumer-84505/owners/huying_test/0 /consumers/console-consumer-84505/ids /consumers/console-consumer-84505/ids/console-consumer-84505_huyingdeMacBook-Air.local-1460795322590-79d6b9c5 /config /config/topics /config/topics/huying_test /config/changes /controller /admin /admin/delete_topics /brokers /brokers/topics /brokers/topics/huying_test /brokers/topics/huying_test/partitions /brokers/topics/huying_test/partitions/3 /brokers/topics/huying_test/partitions/3/state /brokers/topics/huying_test/partitions/2 /brokers/topics/huying_test/partitions/2/state /brokers/topics/huying_test/partitions/1 /brokers/topics/huying_test/partitions/1/state /brokers/topics/huying_test/partitions/0 /brokers/topics/huying_test/partitions/0/state /brokers/ids /brokers/ids/0 /zookeeper /zookeeper/quota /controller_epoch
scala爽和不爽
ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] case None => dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) }模式匹配,在接收请求的时候用起来特别爽
brokerIds.map { brokerId => partitionReplicaAssignment .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) } .map { case(topicAndPartition, replicas) => new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) } }.flatten.toSetserver端编程大量集合操作的时候这种高阶流式处理集合比java方便太多
val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)这里用到了Stream的lazy evaluation,制造了一个无限循环的队列,cool!
不爽的地方
1.debug的时候ide支持还是不够,有些表达进不去或者看不到值
2.有些写法写起来爽读起来痛苦,比如这样的
val streams = e._2.map(_._2._2).toList
发送端性能问题
Sarama假异步
sarama客户端本身是支持异步发送模型的,但是sarama老版本实际上做的假异步https://github.com/Shopify/sarama/issues/2103
服务端请求处理
1.网络层将请求投递到 RequestChannel
2.多线程的 KafkaRequestHandler 并发的从 RequestChannel 获取请求, 交给 kafkaApis 处理
这里看起来会有顺序问题,但是processCompletedReceives里面会调用selector.mute禁读,所以服务端对同一个连接上的读请求是一个一个顺序处理的过程。这个时候在tcp buffer里面是可以缓存更多的请求的,通过这个buffer可以平摊掉客户端到服务端的网络延迟
WaitForAll
因为服务端对同一个连接上的请求是顺序处理的,所以WaitForAll的时候broker间数据复制的延迟会让服务端处理一个请求的耗时变长而这个耗时是没法通过异步的方式化解的。batch还是有可能平摊掉多个请求顺序处理的时间,但如果slave broker的io有问题batch效果可能也不明显?