2016年8月16日星期二

分布式和大数据方案备忘

扩容


  1. 分布式缓存(tair) 一致性hash存储,2台扩3台,a,b,a,b,a,b -> a,b,c,b,a,c 只用全量迁移2台机器的数据,成本最低
  2. 分库分表数据库 (1)先做全量迁移并记录迁移之前老库的位点 (2)全量迁移是整表迁移,这也是要做分表的原因之一 (3)全量做完以后开始做增量 (4)增量追上以后把路由规则切到新库
  3. kafka 两种扩容,如果分区开的多只扩机器不扩分区这种可以做到自动迁移,等kafka新机器的isr追上进度即可提供服务。如果扩了分区数,为了保证顺序,必须停写,然后等消费端消费完毕才能扩容
  4. rocket mq扩容,目前对于顺序消息必须停写等消费端消费完毕,因为rocket mq的高可用方案是双写,新机器加进来以后没有isr的方案帮助新机器拿到队列里面的数据
  5. 分布式文件系统 有name server存在,name server会做动态负载均衡,所以新机器加进来即可自动完成扩容和后面的数据均衡
  6. pulsar扩容,在kafka的基础上走一步,把kafka的segments加上元数据管理让一个分区的segments能分散在不同机器上就能做到全自动的迁移和扩容,pulsar的另一个重要特点是存储计算分离原来的broker拆成不存数据的broker和存数据的bookie。但是zk的依赖好像依然是个问题。

热点解决方案

tair hot ring

  1. https://yq.aliyun.com/articles/746727
  2. 基本思路是hash冲突时尽量减少爬链表的过程,把最热的放到最前,这个移动过程可以通过采样来发动
  3. 如果移动链表元素动作较多,并发冲突较多,所以采用ring结构只用移动head指针即可
  4. ring结构转一圈后无法判断是否跑完了,所以要对key排序,这样找到比当前key大的就说明再也找不到了,为了加速比较过程,还在key前面加了tag
  5. 扩容的时候根据范围拆分,原来tag的范围是0-t,拆分成0-t/2和t/2-t,header也分成两个,也就是个标准的rehash手段

hash+range二级散列方案

Hash(a,b)=Range(Hash(a),Hash(b))
有区分度而且保留了一定的范围查询的能力,https://zhuanlan.zhihu.com/p/424174858

一致性

  1. 健康检查类型的数据传播,仅要求最终一致性,所以可以用gossip的方式传播,实现简单
  2. 配置型信息存储,顺序很重要,所以需要用paxos或者raft类型的协议存储


megastore

  1. entity group类似于分表,但是每个数据中心的entity group是相同的。一个entity group内部是强一致事务,跨entity group可以用2pc做强一致事务但推荐的方式是用消息队列做弱一致事务,论文里描述的一个场景是a邮箱给b邮箱发邮件
  2. 高可用性是通过在多个数据中心做paxos写来实现,这种方式的优点是没有master,省去可很多运维上的麻烦。写数据的流程是(1)读取WAL的最后提交位置(2)把要写的数据攒成一个log entry(3)paxos提交(4)把数据合并到big table里面的数据行和索引(5)清理临时数据
  3. 存储和查询基本按关系数据库的做法,但是有主从表的概念,比如user和photo,user是主表photo是从表,两个表相关联的数据在物理上也存在一起
  4. bigtable可以把同一个cell的数据根据时间戳多版本存储,megastore通过这个特性实现mvcc
  5. 有一个coordinator的角色负责记录本地所有paxos写成功的表,通过这个信息尽量保证本地读。这里细节没完全懂,既然是多写,为啥还需要这个角色?
  6. paxos write做了优化,每一轮写都会选一个leader做裁决,选leader的原则是leader所在的区域发起的写最多。具体流程是(1)找leader接受要写的数据如果接受就跳过步骤2(2)选一个最高的sequence,并且把要写的数据更新到最新状态(3)让其它的replica接受更新,如果多数接受就成功,否则重做步骤2(4)invalidate coordinator(5)所有replica把数据更新到本地bigtable

基于json的架构1


1.存储基于mysql,每个表只有两个字段id和json块

2.在mysql里面写,在搜索引擎里面读,mysql和搜索引擎基于binlog同步

3.mysql和搜索引擎上面有一个统一数据层负责读写的分发

4.所有领域对象都是json表示,有一个配置中心负责配置规则做校验和搜索引擎的索引字段

5.服务层上面有一个路由和校验层,根据调用的json串做路由

6.读写延迟是个比较大的问题,无法一致性读导致复杂的业务逻辑可能会比较难做,对数据同步工具要求也非常高


uber schemaless


1.存储基于mysql,cell是基本单元可以是一个json比如乘车基本信息这样的,row_key,column_name,ref_key决定一个cell。一个row可以有多个cell而且不用事先指定。整个表类似于一个三维hashtable row_key->column_name->cell{key1:value1, key2:value2}

2.对cell的操作是基本操作,只能insert不能update或delete,对cell的insert是高版本ref_key覆盖低版本ref_key这样完成update。操作基于cell级别可以有效减少写入数据体积

3.分片存储,每个分片是一个独立的mysql数据库,每个库有一张表存cell实体,每个二级索引用一张表存储,还有一些辅助表。每个cell是实体表的一行,每一行有added_id,row_key,column_name,ref_key,body,created_at,其中added_id是给后面的trigger排序用的,body数据存成压缩的json。这种存储结构是拿多个mysql row拼成一个schemaless的一个逻辑row

4.trigger类似于精卫,有failover,重投这些行为,通过trigger实现异步事件总线,完成类似于行程结束之后付款这样的操作

5.二级索引可以建到cell里面的属性上面,而且支持联合索引,这个感觉是靠索引表实现的。索引也要求有分片字段,索引和cell可能不在一个分片,只做到最终一致,有可能读到旧数据

6.cell存储根据row_key分片,每个存储节点一主两从,尽量读主节点,主从异步复制,读主节点失败会转去读从节点。写主节点失败,会随机找其他分片的主节点写入,但是会告诉客户端这个写入目前不可读,这个叫缓冲写入

7.缓冲写入,写主分片主节点之外会随机挑一个分片的主节点做备份写,在备份分片上的写就是缓冲写,如果主分片主备复制完毕,从节点上出现了新写的数据,worker机器就会把备份分片上的缓冲写数据删掉。相当有趣的解决mysql主备复制问题的方案

8.这个方案有高可用性,扩展性也不错,查询也可以跨维度,感觉比前面的基于json的方案牛逼


缓存使用进化


1.tomcat服务端使用业务对象缓存,web框架对静态页面使用page缓存,浏览器和nginx缓存图片和js这些。缺点是还要走servlet这些,java处理耗cpu,容易到上限

2.动静分离,nginx,缓存服务部署在同一台机器上,静态的数据走缓存,动态的数据由客户端js做csi回源。请求由lvs按一致性hash打进来提高命中率。缺点是管理麻烦,不同的机器间没有互通。

3.统一缓存层,缓存服务单独部署集群,回源采用esi,由缓存服务器找java server回源,这里应该是走比较快的协议。优点是管理方便,而且缓存数据统一互通。失效策略可以有定时,根据tag计算,异步业务消息和监听binlog

4.cdn,先通过dns做路由到最近的cdn节点,cdn如果拿不到数据就找统一缓存层,统一缓存层对动态数据做回源



hbase,lsm tree,列存,kv-store,sstable,skip-list,bloom-filter


1.kv-store是键值对存储,sstable是一种实现方式,特点是按key排序存在内存或者文件里

2.sstable如果没有其它措施读写都会很慢,所以首先有了lsm tree,内存里面L0,然后文件里面L1,L2...一般越来越大. lsm tree一般是写内存,超出大小以后批量merge到磁盘,所以写性能不错

3.说是lsm tree但也可能具体的tree是skip list, skip list加上随机化层数后平衡性不错而且并发的版本加锁比较少,缺点是比较耗内存。skip list一般放内存中,磁盘上还是btree比较好

4.随机查询还是慢,所以用bloom-filter来加速在hfile里面的寻址,通过几个hash函数来快速过滤掉一些不可能有目标数据的hfile

5.hbase的存储建立在上面这些之上,region是分布式存储的最小单元,一个region只会在一台server上,region如果大了就会分裂,region在hdfs上面是个目录。每个列簇对应一个store,一个store里面有一个mem-store和一些文件,这些文件也放在一个目录里面,列簇里面的内容按rowKey排序存在一起,region里面可以有多个store

6.rowKey col1 col2 col3应该是存成 rowKey col1, rowKey col2, rowKey col3 这样

7.hbase称作面向列的存储,存的是列族,但是一般实践都是只有一个列族,列族内的列可以随意修改所以hbase实际上还是行存储。hbase的优势是自动分片,按rowkey查询快,写入快,只能算高级kv数据库。hbase的version一个是确实有场景用得到多版本的数据,另一个是可以用来做mvcc

8.hbase的事务仅仅是针对单行,对于不同行以及单行分布在不同region的情况都无法加锁。hbase有单行的行锁,而且对于单行的读和写可以通过mvcc保证并行度

9.hbase这种存k-v的叫面向列的存储,这种存储的好处是本质上是k-v store可以很灵活的动态添加列,某些列可以很稀疏不占空间,而且可以做lsm-tree结构。kudu这种光存value然后靠偏移量去定位,优点是可以做高压缩比,范围scan很快,这种结构一般要求schema固定有了类型才好算偏移量选择压缩算法,这种结构感觉做lsm-tree也超复杂

hbase row-key

1.hbase region拆分方式是按row-key的range进行拆分
2.row-key太长的缺点是消耗内存和磁盘空间,sstable方式决定
3.row-key简单单调递增会带来当前region读写热点问题以及之前拆分的region很少有机会再被访问的问题
4.热点的解决方案(1)预分区(2)随机散列,缺点是再也无法范围扫描,根据row-key的部分条件扫描也无法弄(3)加hash运算出来的前缀,优点是还可以范围扫,缺点是每次扫都是多机器而且代码稍麻烦
5.row-key可以存字段信息,并且当做查询条件,比如<userId>-<date>-<messageId>这种组合然后查userId 00005 的数据,支持左前缀。这种方式感觉消耗存储而且row-key设计的不好以后不好改
6.还有一种比较诡异的玩法,因为hbase能随意添加列,所以对于用户收到的邮件这种聚合,可以一行,每封邮件一列但是列名不同,通过列名做索引

Cassandra

1.数据存储模型也是bigtable的模型,map<row-key, map<column-key,value>>
2.memtable+sstable+commitlog
3.分片方式是一致性hash,节点的状态是通过gossip协议点对点传播,所以没有了master-server这样的角色,少了单点问题,也不会有hbase region分裂时候的可用性问题,tidb通过raft+状态机解决分裂时的可用性问题
4.高可用的方案是wrn,优点是比较灵活可配置
5.cassandra没有使用vector clock,原因在于本质上cassandra是kv型数据而dynamo是文档型数据,所以cassandra可以用update a=xxx where b=yyy这样的方式单独的更新属性,这样大多数merge就可以避免了

Dynamo

1.大模型上和Cassandra非常相似,也是一致性hash分片,节点的状态也是gossip协议传播,也使用了wrn
2.高可用上面有两招,临时的不可用会找另外一台机器做暂存,等恢复以后再把数据拷贝回来。如果机器挂掉时间比较长,会异步数据恢复,副本之间的数据不一致快速检测和恢复使用的数据结构是Merkle Tree
3.vector clock,某些节点宕机恢复以后或者网络发生partition的时候可能出现版本分叉,比如版本1的基础上出现了版本2和版本3两个不同的修改,客户端读到这个分叉版本的时候需要自己去裁决选择一个版本或者合并,然后再把新版本写回去
4.Dynamo的特点是比较简单轻量,最终一致

td-sql

1.要解决的是mysql异步主备同步可能丢数据的问题
2.最原始的是主挂了停用,捞业务日志补偿备
3.k-v存储加黑名单,主挂了只是没有同步好的黑名单不可用
4.td-sql,一主两备为一组,写主只有binlog到达一备才算写成功,主挂了按binlog最后更新时间选新主

Elasticsearch

1.任意维度的联合查询比mysql强大很多很多
2.term-index(前缀树)->term-dictionary(排序)->position-list
3.多条件合并也就是多个position-list合并,可以通过在position-list上面建skip-list然后合并skip-list,或者直接内存bitset合并
4.skip-list的一种压缩方式是记录delta,bitset的一种压缩方式是模一个65536然后记录商和余数
5.在聚合查询上有和mysql类似甚至更好的能力,因为es又引入了doc_values

6.doc_values是正排索引+列存,比如查年龄18岁的平均身高,先用倒排查到18岁的所有doc_id,再拿doc_id到正排索引里面去匹配然后计算,因为正排是列存的而且还压缩了的所以一次性load进内存进行比较,这样想着感觉好牛,还没完全想明白,要比较详细的了解一下底层的数据结构。通过doc_id拿doc_value的过程是fetch过程,因为要从一堆文件里面定位doc_id所以这个过程其实也很耗时间。

7.es5之前数值索引都是先转换成string然后用前缀fst的方式索引查询,这样的问题是range特别慢,在前缀树上面找range要递归,es6使用了bkd-tree,也就是lsm-tree版的kd-tree,不用b-tree是为了兼容数值型和多维类型的查询。kd-tree的速记方式是二叉树,第一级按第一个维度排序,第二级按第二个维度排序。。。查找的时候也是一个一个维度拿来查

8.es的多条件查询是取交集的形式可能会比较慢,如果配合mysql的查询扫描方式会有很大提升

9.lucene有内存buffer和磁盘segment(可以看成一个独立的子索引)类似lsm tree所以写入高效,但是不能实时查询是因为要给segment建倒排索引,这一步是buffer刷成segment的时候做的所以只能准实时,通过这个lsm tree可以做id到doc的查找

10.es扩展了lucene加了一些系统字段,_uid主键,_version,_seq_no,_source来支持update,_field_names支持稀疏列判断,_routing数据分布,利用的是lucene本身的能力很cool
11.es有个transLog有点像metaq,持久性保证需要每次请求刷盘,如果要实时性可以从transLog查数据,查transLog只能byId,根据倒排索引查询只能准实时。log写放在内存写之后,一个是防丢数据一个是实时查询
12.lucene不支持update,es支持的方式是在内存里面merge成一个新doc,然后add到lucence里面去,其中version起乐观锁的作用
13.海选,粗排,精排,海选一般是根据记录某属性做简单分层,粗排一般做简单算分排序召回1000到2000数据,精排可以有比较复杂的运算做出最终排序



TIDB

1.sharding策略有两种range和hash,range优势是范围扫描和扩容缩容比较好做,tidb使用的这种,缺点是顺序写热点。hash没有顺序写热点问题但无法做范围扫(不过实际业务中的一些加了分库键的范围扫还是可以的),还有就是扩容缩容比较麻烦容易导致较大抖动

2.tidb使用的是多raft-group,就是说每个shard(region)是一个group

mvcc & read skew, write skew

1.time oracle为了解决外部一致性问题,试想如果没有中央协调器,怎么能做到准确的读到一个事务的完整状态?很可能部分节点写完成了,部分节点还没有。而节点很多的分布式环境一个中央协调器又会成瓶颈,所有就需要一个全局的事务id来界定事务所以有了time oracle

2.write screw 两个不同的银行账户各有100块钱,约束是总钱数必须不小于100,两个事务都是先做检查然后取100,mvcc并发状态下两个事务都不违反约束又不会有mvcc冲突。stm内存锁的解决方案是进入事务前都把所有的资源都加个读锁,但是分布式协调器的情况无法统一加锁,所以才有了google的time oracle的方案  

3. percolator没有对read加锁,所以只是一个snapshot isolation,而couchdb这样的对read加锁实现的是serializable snapshot isolation隔离级别,但有性能代价

4.read screw r1[x], w2[x], w2[y], c2, r1[y], c1 or a1

5.引发的问题称作事务的外部一致性

时钟策略

1.逻辑时钟,每台机器维护本地序列号,如果一个事务跨机器从机器1传递到机器2,机器2会比较机器1传过来的序列号和本地的最大序列号取最大的,不同机器的数据只要在事务中形成交叉就相当于建立了逻辑关系。这种方式的缺点是不同事务如果没有数据交叉可能序列号的gap会很大。

2.hlc,物理逻辑混合时钟,在物理时钟不变的情况下新事务只增加逻辑时钟,如果物理时钟变了就跟着最大的物理时钟走。这样相当于用逻辑时钟来校准物理时钟,但不同事务又能通过物理时钟比较大小,但是hlc也无法完全做到不同的没有数据交集之间的事务之间的因果律。

Percolator

1.Percolator是为了补充map-reduce全量离线计算的不足而产生的增量索引系统,首要的优势就是增量计算时效性好。增量计算和全量计算的区别在于全量计算丢弃之前的所有计算结果全部重来,而增量计算会根据新的数据去查找之前相关联的计算结果,然后在之前的计算结果上做局部运算。计算的可加和性是增量计算是否便于实施的关键

2.为啥需要事务?爬虫从多个url抓到了相同的内容,只需要将pagerank最高的url加到索引,但是每个外部链接也会被反向处理,让其锚文本附加到链接指向的页面,指向复制品--pagerank低的那些页面的链接必要时也会被修改指向pagerank最高的url。这里就涉及到正向索引,反向链接锚文本和指向复制页面链接等几个状态的修改,所以需要一致性事务。还有一个可能的原因是一个文档的pagerank,内容hash这些可能不是放在一张表里面而是分开存放的

3.percolator的问题域特点是海量数据和延迟容忍度相对高,延迟容忍高所以对脏锁清理可以比较懒惰

4.percolator也是2pc事务但是和传统的2pc事务最大的差异点在于没有中央协调者,而这个特点带来了极大的可伸缩性

5.能做到无协调者的几个关键因素是,mvcc快照隔离级别,bigtable的mvcc单行事务和可扩展性存储,轻量的统一时间戳提供者

6.percolator的表的一行内容是,C:data(实际存储的数据),C:wirte(控制数据可见性是否commit),C:lock(事务用的锁),C:notify(通知机制用,避免全数据扫描),C:ack(防止重复通知)

7.mvcc和乐观锁的区别在于mvcc可能是多行的一个统一的状态,比如事务1写了行a然后去读取行b,这中间事务2写了行b,那事务1应该是读取事务2写之前的版本,所以这里就需要有时间戳比较或者版本号。而乐观锁只是锁定一行就行了。单机版的mvcc时间戳比较容易做,而分布式的mvcc就需要time oracle

8.论文中代码需要注意的几个点(1)被实现的Transaction是分布式事务,调用的bigtable::StartRowTransaction是bigtable单行事务 (2)oracle.GetTimestamp()是取统一时间戳
(3)注意区分代码中读写数据的时候"lock","data","write"几种不同的标签 (4)Get方法最有意思的是先去读一下当前row的锁,如果读到了就可能清理过期的锁这也是因为没有协调者所以只能靠读的时候清理锁 (5)数据提交过程类似2pc分两步,第一步Prewrite写入数据和锁信息,这一步可能会发生冲突而失败,第二步真正让数据可见也就是写入"write"列并且会把锁擦除。(6)注意每次写入的时候会选一个Primary Record,这个玩法类似传统2pc Primary写入成功作为如果发生异常后面的客户端判断这个事务该回滚还是该提交的标志。(7)提交的时候擦除锁是一个一个进行的,所以Get()方法碰到锁的时候有一个等待,这样就不会读到部分成功的状态

9.time oracle通过定期分配一个时间范围放在内存的方式避免磁盘io,worker也有定期批量从time oracle获取时间戳的机制,这样time oracle单机tps近两百万,这个地方无法得知更多细节但是思想大致能懂

10.数据触发器这点上使用的是弱一致的通知语义避免锁占用时间太长,而实现通知语义使用的是线程范围扫描,有两个提高性能的点,一个是使用notify列减少扫描数量(因为有变更的毕竟是稀疏的),第二个是后面的线程扫描和前面的线程冲突的时候会再做一次随机选择避免公交车效应,这里对细节不是很清楚

11.Percolator处理一个文档需要50个bigtable操作,rpc多,所以使用了很多微batch等待打包rpc请求,还有通过对一行数据的预读的方式提高性能

spanner

1.和percolator最核心的区别是spanner是全球分布的而且是一个数据库,所以不能用单一的time oracle,数据时效性也比较重要所以必须有事务协调者

2.多个time server就意味着时间差异,spanner使用了原子钟+gps保证time api高可用而且时间误差非常小,多台time server之前会互相做时间校对,而且每个时间客户端也会从多台time server拉取时间获取综合值

3.有了时间误差就有了置信区间的概念,从time api取得的时间是有误差区间的,所以分配时间戳的时候会把误差区间考虑进去从而保证事务之间的绝对时间顺序关系,甚至会采取等待的方式

4.数据高可用是采取带leader的paxos变种实现的,最基本的单位是tablet(类似于数据库表),每个tablet的几个replica形成一个paxos group,replica的数量和分布都灵活可配置。读写发生在多个paxos group就形成了分布式事务

5.spanner牛逼的一个地方是数据分片会很灵活,Spanner底层存储的是有序的KeyValue集合,在数据模型上,细化了directory这个概念。一个directory是连续的一组KeyValue,比如user_id=1的用户和他所有的相片信息就组成了一个directory,多个directory组成一个tablet,但又不像bigtable那样每个tablet里面的主键都是连续的按范围划分的。而是通过placement server做动态调整,把经常一起使用的向物理位置近的地方放,做到尽量优化

6.分布式事务采用的是2pc+mvcc的方式实现,每个事务都会有协调者,协调者的选举应该也是和高可用的leader选举类似。剩下的核心点就是怎么控制time api的误差来做到先提交的事务先可见。之前有个疑问是为啥不用协调者生成事务id,这是因为协调者可能会经常变,而且不同事务的协调者可能不是一个

7.spanner的数据模型还是google最常用的基于嵌套结构的半关系型模型,spanner tablet的存储是类b tree,这个细节会有什么影响还需要后续了解

google dremel

1.第一个核心问题是通过列的方式存储嵌套的数据结构,所以有了r,d的数据描述和有限状态机的数据读取形式
2.r代表当前记录和上一条记录在哪个嵌套深度有公共节点,d代表当前记录walk到根节点需要走过的记录数,d对有内容的节点其实意义不大因为都应该是一样的但是对null意义比较大,能还原出null所在的层级,通过r和d就能完整的还原出整个层次结构
3.只存储叶子节点的数据,读取过程是通过有限状态机在不同列之间跳转读取来还原嵌套结构
4.查询的时候好像也没有什么大招,就是先还原出嵌套结构,然后过滤,但感觉这个顺序可以穿插起来如果是单条件查询感觉还是没有什么好招去加速
5.嵌套的数据结构和关系型一对多多层关联相比,保留了数据关系,跨层关联的时候比如查询Country是xxx的name有多少个,关系型数据库就是三层join,而嵌套型结构需要遍历的数据会少很多,在dremel里面可能是先查一下Country列,然后根据r和d判断一下对应的name就可以了,都不一定要走中间language层。嵌套型的缺点是可能会有大量数据冗余,视具体情况而定

Durid

1.实时olap,亚秒级查询,实时数据导入立即能查到,不支持join,列存,有时间序列数据库的特点,带上时间查询会方便寻址  

2.olap基本理论,列按照职责划分timestamp column(时间戳),dimension columns(过滤或者聚合的维度),Metrics(聚合和计算的基本数值)

3.durid不存基本数据而是在数据导入的时候预先对数据做各个维度的first level roll up即预先做好基本聚合,是典型的molap,这个和工作中遇到的数据应用场景完全吻合

4.durid分实时节点和历史节点,实时节点定期merge到历史节点,查询也可能merge实时节点和历史节点的数据

5.segment是存储的基本单位代表一个时间窗口内的数据,Segment文件名称的格式:dataSource_interval_version_partitionNumber

6.完全的列存,只保存列值不保存row-key,会采用字段表的方式压缩,还有位图信息做过滤筛选

Kudu

1.融合oltp和olap,hbase,cassandra这些大范围scan性能有限,纯列存数据库随机读写性能差。kudu大范围scan性能高,随机读写性能不太差

2.真正的列存而不是sstable,schema固定,数据有类型。有一个隐性的行号列,通过这个行号列来定位列里面的记录

3.分区方式是hash+range结合,兼顾范围扫描和避免局部热点

4.RowSet是最小存储单位,又分为MemRowSet和DiskRowSet,内存中用的是行存储刷到硬盘中使用的是列存储。Kudu的存储是base+delta的形式不同于lsm,lsm永远先写MemStore,Kudu是insert先进mem但是update可能会直接写到disk,lsm的key对应的值可能在多个store但是kudu的只会在一个RowSet的范围内,虽然可能同时在MemRowSet和DiskRowSet甚至delta file里面  

5.一个RowSet只有一个MemRowSet,数据insert的时候都会进MemRowSet,实现是一个并发优化过的b+ tree。RowSet有一个DiskRowSet(base file)和多个DeltaFile(undo,redo record)

6.DiskRowSet是列存,会被切成多个page而这些page通过b tree管理,也用了bloom filter来加速对pk的查找。DeltaStore也分Mem和Disk,会组织一个map,key是(row offset+timestamp),value是rowchangelist。update的时候会先通过bloom filter和b tree从DiskRowSet里面找到记录的offset,然后在delta file对应的offset的位置写入值

7.kudu也支持mvcc以及external consistency,也有类似于time oracle的hybrid time

Calvin

1.解决事务并发冲突的办法是让事务根本无法并发,把所有事务做一个全局排序变成确定性的,有一个关键点是batch,不然无法做依赖排序

2.许多sequencer拦截所有的事务请求然后保存到局部一点的存储并同步到其它sequencer对应的存储,这些存储又被一个中央的meta存储管理

3.很多scheduler并行去执行这些事务,如果所有事务单线程执行当然不会出错但效率太低,我的理解是scheduler会分析这些事务的数据依赖,然后尽量并行它们。这个方案给我的感觉有点类似oz的思想

图数据库  

1.索引存储的方式是用顶点+邻接表的方式存储,所以邻接表拿到顶点列表,以及通过顶点拿边都是O(1)的时间,所以就很快了。而mysql join找相邻顶点列表,以及通过顶点找边都要扫表    

2.比如有一个user表,一个item表,要存储好友关系和购买关系,就会有一张friendship表和一张purchase表。这个存储方式和关系数据库里面自己建表的方式区别不大,但是在查询的时候我理解图数据库是用图搜索的方式推进的而关系数据库是通过join实现的,所以在查询多层关系(>3)的时候图数据库性能有指数级别优势


向量化执行引擎

相关的两个关键字是列存和SIMD,SIMD是多个操作数组成向量一个指令完成多个操作数运算。向量化执行引擎充分利用SIMD,而只有列存的情况一列数据才会很容易load成向量利用SIMD进行操作


KAD(Kademlia)算法

1.这是一个分布式hash的算法,可以参见文章https://www.jianshu.com/p/f2c31e632f1d  

2.首先是存储内容按hash值尽量均匀分配存储到多个节点,并且有冗余备份

3.算法的核心是怎么寻址,地址不可能在每个节点全量存储,所以每个节点只能存部分地址,这种情况寻址肯定是跳多个节点。整个算法的亮点是发明了一种xor距离的概念,xor距离带来了距离分层2^0, 2^1, 2^2 ...在相同层次k的节点之间的xor距离必定小于2^(k-1)。所以这样带来的结果是最多k次寻址就可以找遍2^k个节点,而且每个节点的地址表理论上存k个地址就够了


IPFS(星际文件系统)

1.对标http,http是中心化存储,ipfs是真正意义上的分布式文件存储。而且由于merkledag的数据结构,可以做到分布式环境下的去重。  

2.个人理解的核心是把文件拆成块使用merkledag数据结构存储,可以起到文件寻址,去重,防篡改等类似区块链的效果,然后对每个块使用KAD算法做分布寻址。  

3.在应用上增加激励机制来鼓励分享这样会有很多应用的想象空间。


Succinct Trie

1.高压缩trie tree,succint数据结构是一种高压缩数据表示方法  https://www.jianshu.com/p/47a61caa6490

2.succint主要是用位图的方式表达了树的结构关系,核心在于rank1(0)和select1(0)两种操作分别代表第position[0 ... x]中1(0)的个数和第x个1的position。对树的编码方式是节点r有0个子节点就是0,有1个子节点是10,2个子节点110 ... 然后按广度遍历的方式放置成一个bitmap
参考文章https://www.jianshu.com/p/36781efac8e9  

3.树的子节点或者父亲节点都能用rank和select两种操作简单表达出来

4.Fast Succinct Tries分sparse和dense两种,dense一般在trie上层因为上层一般比较满,sparse在下层。dense每个节点,1层放所有字符比如a-z的bitmap,2层bitmap放有没有子节点,3层bitmap放是否是前缀结束,4层是合法前缀对应的值。查询的时候2层用来做tree导航。sparse结构每个节点,1层放字母的二进制,2层用一位表达是否有子节点,3层表达是否是父节点的第一个子节点,4层放合法前缀对应的值。2,3层用来做tree导航。

5.Fast Succinct Tries点查比bloom filter差,范围查好很多


Fractal tree(toku db)

1.mysql(innodb) pk 顺序插入快,随机插入慢。原因是随机插入可能造成节点分裂写放大,存储不连续碎片化。
2.mysql有插入缓冲的解决方式只能针对非主键,大致做法就是增加一个缓冲区,批量刷出。
3.fractal tree的解决方案,结构上类似b+ tree,但每个节点都带msg buffer。插入操作的时候先找到某个节点对应的msg buffer然后放进去就ok了,后台线程异步刷出去。怎么找这个节点的细节还不太清楚。查询的时候需要合并整个查询路径上的msg buffer上面的信息,所以会比较慢。

RAFT

1.通过广播消息+timeout选master,拿到多数票胜。
2.只写master,写了之后记wal日志然后广播到其他节点,拿到多数日志算提交记录commit日志并返回,否则回滚。这里wal日志和commit日志也可以合并。
3.新节点通过master的commit日志来追进度达到一致状态,也可以通过定时的snapshot快速追状态。

存算分离

1.解决成本问题,比如双11只是计算突然飙升但是存储没有太大增加,扩容性价比太低
2.存算一体数据库扩容故障恢复要搬数据(多数情况),恢复时间长
3.存算分离可以使用比一体式方案大很多的存储,大幅度减小分布式事务分布式查询的概率
4.mysql存算分离不是简单的把本地存储换成云存储,需要有存储引擎的较大改变,比如Aurora,polardb,cynosdb。但似乎存算分离的数据库还是单实例数据库,并不能算分布式数据库。
polardb的分布式应该是在存算一体的基础上附加了分布路由和事务
5.这样做的原因是(1)大量存储空间浪费,page+各种log*主备实例数*云盘三副本(2)本地io变网络io各种性能消耗,带宽消耗
6.db即redo log,只保留一份redo log,其他所有的数据都可以从redo log恢复。写入操作只要redo log落存储层就算成功。redo log被分成多个segment来实现扩展性,log和data遵守一致的分片策略。存储层有能力从log恢复出数据页(传统数据库是计算层做的),计算层读取缺页的时候可以直接从存储层读取
7.计算层依然分主从,但主从都从统一的存储读取。计算层完全无状态,可以快速恢复。
8.CynosStore存储的特点,第一是非对称读写,(写的是日志、读的是数据)。第二是异步写,同步读。第三并发写入、日志串行化。第四是支持两层,块设备层、文件系统层。第五能接入任何基于日志先写的存储系统。第六是存储分布式化

RoaringBitMap

1.主要为了解决bit map稀疏时浪费空间较大的问题,比如32位数的bitmap存储是512M,但如果数很少会比较浪费
2.Roaring Bitmaps把32位数的空间划分成2^16个桶,每个桶最多可以存放2^16个数,刚好覆盖整个空间,桶里面存的是数字的低16位
3.保存一个数的时候用数的高16位作为桶的编号,如果桶内的数较少少于4096,就存成array,这样比较省空间,比如只有4个数就只用存4*2个字节,array动态扩容。如果数较多就会存成bitmap,2^16个bit是8KB的空间
4.如果存满是65536*8K=512M,和bit map相同。参考不深入而浅出的Roaring Bitmaps原理
5.64位不能简单像32位那样搞2^32个桶因为一个2^32的数组占用空间已经很大了。一种做法是前32位用红黑树存,更好一些的做法是前48位用art tree存,后16位还是放在一个个桶里,art tree可以认为是可变长的tried tree,每层的节点数可以是4个,16个,48个或者256个。

Z-order Index

  1. 参考Z-order是如何提升查询性能100倍的
  2. 解决的问题还是b tree多维度索引必须前缀匹配的问题,z-order的思路是把多维的信息压缩成一维的顺序,而且每个维度平等,尽量保证局部性
  3. 压缩下来的效果的例子(a1,b1),(a1,b2),(a2,b1),(a2,b2),(a2,b3),(a2,b4),(a3,b1),(a3,b2)...
  4. 二维信息压缩的方式,将x和y表示为二进制以后,每个bit位相互插值即可产生Z-Address
  5. 这样做的好处是多个维度查询都能获得不错的性能,但缺点是单一维度没法获得极致的性能了,这可能也是clickhouse没有使用z-order的原因
  6. 本质是一位有序路径上尽量保证高维度上的局部性,Z-order的Z字形路径其实还不完美,还有希尔伯特曲线路径等等

向量检索

  1. 向量索引对LLM有用,因为LLM context不能太长(内存,计算复杂度,长期依赖),向量化可以储存context知识,相当于LLM的一份外存
  2. 一种比较优秀的向量检索算法是scann
  3. 算法的主要思想是,近似计算,信息压缩,分片查询
  4. 首先是Vector quantization,快速理解可以看矢量量化,把高维(d)向量拆成M块,对每一块做KNN选256个中心点(8位),然后把每一块数据替换成0-255中间的一个,这样原来的数据变成了M个byte。原来的内积相似搜索O(Nd)变成现在的O(256*d+MN),M<<d
  5. 同样通过K-means把把空间划分成多个部分,看查询点q处于哪个部分,然后只比较那个部分所有的点就好了
  6. 在这些的基础上scann提出了各向异性量化损失函数提升计算精度,细节没太懂基本思想是和传统的内积损失函数比,增加了和向量方向相同的权重,降低正交方向的权重

XOR Filter&Ribbon Filter

  1. xor filter 比Bloom Filter节省25%空间!Ribbon Filter在Lindorm中的应用
  2. bloom filter的主要问题是空间占用较大1.52n(n=log...)所以有了省空间付出额外cpu的xor filter(1.23n)
  3. xor filter的主储存是m=1.23n个元素的数组,每个元素r位。查找的时候对于key,算三个hash到数组三个位置拿出三个r位的元素做xor,xor的结果用来和footprint(key)做比较,不相等就确认false
  4. footprint函数怎么来的还不太清楚,已有footprint函数和3n个hash函数的情况下构建xor filter的过程大致是每个元素找不和其他元素hash后发生位置冲突的位置,重复这个过程直到所有元素放进数组(如果找不到了就换hash函数重来)。最后插入的item位置直接设置对应footprint函数的值,然后开始反推其他item位置对应的hash值(用已有hash xor footprint)
  5. ribbon filter是在xor基础上进一步优化空间,基本思路是类似的,主要的区别在于三个hash函数的结果是一个向量,filter数组是一个0,1矩阵,filter的过程是计算矩阵乘法。filter构建的过程是高斯消元法(解多元一次方程组)
  6. ribbon filter的filter数组是列存,过滤时不需要一次全部加载可以一列一列计算有短路逻辑
  7. ribbon的r是7而xor是8所以ribbon的存储是1.101,之所以能做到是因为矩阵信息更密集?



2016年8月6日星期六

对区块链的一点理解

读了一篇很不错的讲区块链原理的文章,整理了一下思路 http://o.btc123.com/data/docs/easy_understood_bitcoin_mechanism.pdf



在知道谜底的情况下重新推导一下

1.需要去中心化,所以选择分布式存储,数据量可接受,所以存全量
2.交易数据需要正确性,顺序性,所以存储全部历史,链式存储
3.交易安全保证,使用公匙,私匙协议,这个比较常见
4.需要解决拜占庭将军问题,首先是一致性问题,这个类似于抢占式协议
5.数据伪造问题,这个是很多分布式协议里面碰不到的,所以有了牛逼的工作量证明协议
6.工作量证明协议最重要的是,验证容易,计算困难,计算过程可协作
7.本质上还是一个保证一致性的分布式存储系统

少了一个Merkle Tree

1.这个结构用来在整个链中间验证交易是否存在
2.从最长的链拿到根hash值和该交易认证路径,然后根据路径重新计算根hash值和实际的比较就行了
3.在数据库类型的系统中会有这个快速定位修改

之前对pk的过程理解得不是很清楚

1.主要是为了防止双花,就是说a客户端先给b付了一笔钱,然后又把这笔钱付给c(前提是这个客户端被改过了,不然本地校验过不去),然后创建一个强大的支线链路争取超过第一笔交易
2.但是第一笔钱已经跑了6个区块了,第二笔钱的链无论怎样也追不上,所以不会被系统承认,如果没有跑6个区块那第一笔交易其实没有达成如果被第二笔超过那就只成交第二笔就好了
3.之前有个地方理解不对,分支长链并不会把之前短链的交易都干掉,只会让短链上的工作都转到长链上面来


2016年4月28日星期四

学习一下kafka

学习的代码版本是0.8.2.1

原理

组件图


存储

目录结构

tmp$ tree kafka-logs/
kafka-logs/
|-- huying_test-0
|   |-- 00000000000000000000.index
|   `-- 00000000000000000000.log
|-- huying_test_ultimate-0
|   |-- 00000000000000000000.index
|   `-- 00000000000000000000.log
|-- huying_test_ultimate-1
|   |-- 00000000000000000000.index
|   `-- 00000000000000000000.log
|-- huying_test_ultimate-2
|   |-- 00000000000000000000.index
|   `-- 00000000000000000000.log
|-- huying_test_ultimate-3
|   |-- 00000000000000000000.index
|   `-- 00000000000000000000.log
|-- recovery-point-offset-checkpoint
`-- replication-offset-checkpoint

一个目录代表一个(topic, partition)组合,在代码中是一个Log对象,每个Log包含多个LogSegment,一个LogSegment包含一个log文件(FileMessageSet)和一个index(OffsetIndex)

Log

+----------------+
|offset 8(bytes) |
+----------------+
|messageSize 4   |
+----------------+
|CRC 4           |
+----------------+
|Magic 1         |
+----------------+
|attributes 1    |
+----------------+
|key 4           |
+----------------+
|size 4          |
+----------------+
|content         |
+----------------+
+----------------+
|offset 8(bytes) |
+----------------+
|messageSize 4   |
+----------------+
|CRC 4           |
+----------------+
|Magic 1         |
+----------------+
|attributes 1    |
+----------------+
|key 4           |
+----------------+
|size 4          |
+----------------+
|content         |
+----------------+

OffsetIndex

+------------------+
|relativeOffset 4  |
+------------------+
|positionInByte 4  |
+------------------+

LogManager startup

1.遍历所有的日志目录,如果有.kafka_cleanshutdown就说明是干净的shutdown,跳过当前log的recovery
2.读取recovery-point-offset-checkpoint文件,返回(topic, partition)->offset 映射。其中文件的第一行是版本号,第二行是记录条数,后面是映射内容
3.计算出log目录和对应的recovery point,并用来初始化Log对象。
4.清理工作,清理掉.kafka_cleanshutdown
5.启动三个定时任务,kafka-log-retention, kafka-log-flusher, kafka-recovery-point-checkpoint
6.retention是删除时间过久,体积过大的log文件。flusher是根据时间间隔刷盘,调用FileChannel.force。recovery-point-checkpoint是把内存里的(topic, partition)->offset checkpoint结构刷盘。
7.启动LogCleaner

new Log()
1.删除掉.deleted和.cleaned后缀的文件
2.碰到.swap说明在swap中途server挂掉,这里的swap过程仅仅是一个rename。这种情况对于index的swap直接删除后面从log文件重建,如果是log的swap就先删index再通过rename的方式swap回log
3.遇到.index索引文件,如果没有对应的.log文件直接删除
4.通过.log文件生成LogSegment对象,如果没有索引文件就运行LogSegment.recover重建索引
5.重建索引的过程是以message为单位遍历LogSegment,遍历过程中如果累加的大小超过了index interval就在索引里面记录一下当前消息偏移量(会转化成相对偏移)和.log文件里面的字节为单位的位置。这个过程中会顺便把log文件和index文件末尾的可能因为crash产生的多余的字节给清理掉。
6.开始根据recovery point做恢复,这里碰到cleanShutdown文件就会跳过恢复
7.遍历从recovery point到末尾的LogSegment,调用LogSegment.recover,如果过程中发现多余的字符说明LogSegment非法,就把非法的一直到末尾的全部删掉。
8.最后对所有的index做一遍sanityCheck

Log.append

1.参数是MessageSet一次append一批消息
2.分析校验,拿到是否递增,压缩codec,以及校验大小,crc,干掉多余的字节
3.后面的所有过程都加上Log级别的锁,如果需要设置偏移量,就遍历每条消息,设置上偏移量
4.再做一次消息大小检查,因为前面可能重新压缩过消息
5.如果当前LogSegment大小加上要append的大小超过上限就滚动到新的segment
6.用当前Log的logEndOffset生成新的log file和index file,之前的末尾index文件做一下裁剪
7.用新的log file和index file生成新的LogSegment并加入Log里面的列表,并且把recoveryPoint到之前的logEndOffset的segment加入刷盘队列,刷完了以后recoveryPoint增长到logEndOffset
8.如果增长的字节量达到了需要做索引的长度就在index里面append一个entry,然后FileMessageSet.append
9.如果append以后没刷盘的字节过多就刷一下盘
10.一个有趣的点是这里并没有有mapped file但是在索引的地方用了,而rocketmq是主log文件也用了,个人理解是只有读也比较多的时候用这个才有价值,只是一次append然后一次读意义就不是很大

Log.read

1.参数有一个startOffset,首先定位到offset仅仅比这个低的entry,如果定位不到就直接异常
2.调用LogSegment.read,首先要找到log文件里面对应startOffset的第一个合法的position
3.查找过程先通过index查找offset对应的position,查找的过程是标准二分查找,因为用了mmap所以都是在内存里面找。index因为需要经常的随机读和append操作,所以做了mmap
4.调用FileMessageSet.searchFor,在log文件里找大于offset的合法message的position
5.用同样的方法计算出end offset对应position,在算出需要读取的字节数
6.通过这些信息,返回一个FileMessageSet,这里还没有实际读取,后面会提取出byte message返回

LogCleaner

1.如果对一个消息存在key相同但是offset更高的消息,那么就清理掉这个offset低的消息
2.先找最脏的Log,找的方法是先读取cleaner-offset-checkpoint里面存了每个log的最后清理offset,然后每个log拿最后清理的offset到active offset所有segments的size和log size算一个比率,然后取比率最大的
3.开始清理Log,先遍历所有脏的LogSegment,构建一个内存中的OffsetMap(index),key是消息key,value是消息offset,这个map后面用来判断消息是否是重复的
4.把所有的LogSegment分个组,每个组的字节大小总量尽量接近于某个指定值。尽量把一个组压缩成一个新的Segment,这样做让新的Segment体积尽量平均
5.拿出每个组的第一个segment以它的名字创建log和index对应的.cleaned文件
6.对于组里的每个LogSegment,过滤掉重复的和空的message,其余的全部写入那个新的目标LogSegment
7.新的LogSegment刷盘,然后把旧的segment list全部删除,最后把.cleaned名字改成.log文件
8.为啥会有这种清理策略呢?其实是为了用topic保存位点做准备工作,保存位点的场景key比较固定只需要最后状态,所以用这招很好。如果是交易订单的场景就不合适了

网络协议

request and response

1.首先解析2字节的requestId
2.根据requestId选择kafka.api包里面对应的request类来处理,序列化反序列化协议自包含在对应的request类里面
3.有一个correlationId做request和response的关联
4.拿ProducerRequest做例子
+----------------------------+
|     requestId 2            |
+----------------------------+
|     versionId 2            |
+----------------------------+
|     correlationId 4        |
+----------------------------+
|     clientId               |
+----------------------------+
|     requiredAcks 2         |
+----------------------------+
|     ackTimeoutMs 4         |
+----------------------------+
|     topicCount 4           |
+----------------------------+
| +-------------------------+|
| |   topic                 ||
| +-------------------------+|
| |   partitionCount 4      ||
| +-------------------------+|
| | +----------------------+||
| | | partition 4          |||
| | +----------------------+||
| | | messageSetSize 4     |||
| | +----------------------+||
| | | messageSetContent    |||
| | |                      |||
| | +----------------------+||
| +-------------------------+|
+----------------------------+

Message

+--------------+
|CRC 4         |
+--------------+
|Magic 1       |
+--------------+
|attributes 1  |
+--------------+
|key 4         |
+--------------+
|size 4        |
+--------------+
|content       |
+--------------+

消息发送

1.发送分同步发送和异步发送,参见Producer.send,最终都会调用DefaultEventHandler.handle
2.发送消息之前如果topic metadata的缓存更新周期到了先随机找一台server拉一下数据更新brokerPartitionInfo
3.如果这条消息没key就和以前发送的partition相同或者随机选一个分区,如果有key就通过partitioner计算分区号,发送的broker选择leader broker
4.发送失败的消息会进行重试,有条件remainingRetries > 0 && outstandingProduceRequests.size > 0进行判断
5.ProducerSendThread里面有对异步消息批量发送的机制,通过数量多和超时两个条件判断是否该发送消息。可以选择是否要ack,有个配置项request.required.acks
6.server端KafkaApis.handleProducerOrOffsetCommitRequest,首先如果是offset commit request就转换成对offset topic发送消息的producer request
7.调用Partition.appendMessagesToLeader,把message保存到本地的文件系统。调用replicaManager.unblockDelayedFetchRequests把block住的fetch request处理一下符合条件的给response。delay fetch需要满足的条件在DelayedFetch.isSatisfied,主要是积攒一定量的消息
8.根据ack方式的设置,或者直接给response,或者生成DelayedProduce放到ProducerRequestPurgatory里面等后面满足条件的时候再ack。条件在DelayedProduce里面,主要是检查所有replica的end offset
9.保序和线程模型的问题,服务端是多个线程共同处理请求的,所以如果想保序的多条消息同时被服务端处理肯定乱序,但因为有ack机制,发送端发一条ack之后再下一条,这样在同一时间点就只能存在一条需要保序的消息就不会乱序了。

接收消息

1.ZookeeperConsumerConnector主要处理consumer和zk的交互,/consumers/[group_id]/ids[consumer_id] -> topic1,...topicN 这个节点是consumer注册的临时节点并且存放了这个consumer订阅的所有topic,consumer会监听自己group里面其它consumer的变更,感觉是个坑,当一个group里面consumer很多的时候zk监听会非常的多
2.consumer会监听/brokers/[0...N],感觉也是坑。/consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id 记录partition的owner,rebalance的时候会重建
3./consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value 记录消费位点,还是感觉用专用的topic记录比较好
4.consumer启动api,Consumer.create(config)实际上启动了ZookeeperConsumerConnector,启动过程中如果配置了autoCommitEnable会启动一个scheduler定期扫描consumer对应的topic,partition并且提交位点
5. 调用ZookeeperConsumerConnector.createMessageStreams,首先在zk consume group的path下面注册client的临时节点,然后初始化3个zk listener ZKRebalancerListener,ZKSessionExpireListener,ZKTopicPartitionChangeListener
6. ZKSessionExpireListener不监听具体节点,在重连的时候重新注册client临时节点,触发loadBalancerListener.syncedRebalance。ZKTopicPartitionChangeListener监听/brokers/topics/huying_test,发生变化的时候触发rebalance,loadBalancerListener.rebalanceEventTriggered
7.ZKRebalancerListener监听/consumers/console-consumer-84505/ids节点,ZKRebalancerListener启动的时候会启动一个定时任务检查是否发生过rebalance事件,如果发生过就调用syncedRebalance。
8.rebalance的过程先检查broker,如果没有broker就注册一个listener。为了防止rebalance失败导致重复拉取,停止当前consumer上的ConsumerFetcher,清掉线程消费queue和KafkaStream里面的消息。接下来删除zk上的partition ownership信息,和内存里的注册
9.接下来调用PartitionAssignor.assign给当前consumer重新分配partition,有两种分配方式RoundRobinAssignor和RangeAssignor,RoundRobinAssignor先拿到所有的partition和consumer thread,然后分别排序再循环consumer thread去拿partition,但是只返回本consumer对应thread id对应的partition。RangeAssignor也比较简单,比如有10个partition,有5个consumer,当前consumer排序第二,那么就返回第三,第四个partition
10.对新的partition ownership创建临时节点并写入consumer thread id相关信息,针对partition调用ConsumerFetcherManager.startConnections,过程中会启动一个线程找zk拉取partition的leader信息,这个线程还会做一件事,addFetcherForPartitions启动ConsumerFetcherThread,ConsumerFetcherThread才是这正干活拉消息的
11.发消息使用的是SimpleConsumer,SimpleConsumer就是一个简单的blocking的rpc api封装。这里感觉kafka使用内存还是很残暴的,很多可以使用对象池和内存池的地方都没有做,new BoundedByteBufferSend,ByteBuffer.allocate(size)这样的代码随处可见。
12.KafkaStream的数据来自PartitionTopicInfo,有一个比较曲折的关联关系

同步消息

1.在ReplicaManager.makeFollowers里面启动ReplicaFetcherThread找leader拉取消息
2.通过SimpleConsumer(一个简单的rpc封装)发送FetchRequest,拿到response之后append到本地的log,然后更新一下本地的highWaterMark

创建topic

1.客户端分配partition和replica到不同的broker上面。分配方式是对partition的第一份replica按round-robin的方式开始点是随机的一个broker,第二份和后面的replica会加上一个shift,下面是源码中给的例子
broker-0  broker-1  broker-2  broker-3  broker-4
p0        p1        p2        p3        p4       (1st replica)
p5        p6        p7        p8        p9       (1st replica)
p4        p0        p1        p2        p3       (2nd replica)
p8        p9        p5        p6        p7       (2nd replica)
p3        p4        p0        p1        p2       (3nd replica)
p7        p8        p9        p5        p6       (3nd replica)

2.把topic信息和topic,partition和broker的关系写到zk上去
3.server端调用的发起点是监听了zk变更的TopicChangeListener,首先通过zk回调的数据和server缓存的数据差算出新增的topic
4.KafkaController.onNewTopicCreation,首先给topic注册AddPartitionListener,监听的路径是/brokers/topics/topicXXX,然后主动触发一次onNewPartitionCreation事件
5.进入PartitionStateMachine和ReplicaStateMachine的handleStateChange更新状态到NewPartition
6. PartitionStateMachine更新状态到OnlinePartition,在zk上初始化partition的leader和in sync replica的信息,zk path /brokers/topics/huying_test1/partitions/0/state data {"controller_epoch":6,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} 这里有段注释需要注意,写zk可能会失败,当前机器可能会因为长gc失去和zk的session,所以后面catch一个zk node exist exception
7.对成为这个partition的leader的broker发送一个LeaderAndIsrRequest
8.负责管理发送的ControllerChannelManager会保存对每个broker的连接,发送也是先加到一个queue然后后面有一个线程异步发送
9. ReplicaStateMachine更新状态到OnlinePartition
10.broker收到LeaderAndIsrRequest调用ReplicaManager.becomeLeaderOrFollower,先判断controllerEpoch,如果比自己的小就丢弃这个请求,遍历请求中的每个partition,如果请求中对应partition的leaderEpoch小于当前partition的leaderEpoch就忽略当前partition
11.算出请求中partition对应的leader和follower的变更,对于变成leader的partition首先停止对这些partition的拉取job。
12.对每个partition调用Partition.makeLeader,对每个replica如果之前没有创建就创建一个,创建的过程是如果是远程的就只创建对象放在内存里,如果是local的就先创建一个Log,然后读取对应的highWatermark数据,最后生成Replica对象
13.highWatermark对应的replication-offset-checkpoint文件,和recovery point类似,但是recovery point是记录的刷盘的点,high water mark记录的是写入commit的点,所有的replica最小的commit offset就是high water mark。这里可能会增加high water mark
14.如果请求过来的topic是__consumer_offsets,那就启动OffsetManager的异步读。这个topic是用来管理所有的consumer的进度的,这样避免了把消费进度存zk上面影响扩展性。这个异步读会一直读取__consumer_offsets并把消息解码成消费进度放入缓存
15.对于需要变成follower的partition,如果是leader就调用Partition.makeFoller,首先如果本地的replica没有那就创建对应的Log,然后填充和清理一些本地内存结构
16.停止对这些成为follower的partition的拉取线程,把这些partition的Log截断到highWaterMark的位置,并启动对那些成为leader的partition的拉取线程
17.第一次创建topic的最后会启动highWaterMark的checkPoint线程,这个线程定期刷所有replica的checkPoint到磁盘


扩容

1.使用partition reassign tool
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate 
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
2.PartitionsReassignedListener监听/admin/reassign_partitions,PartitionsReassignedListener会触发ReassignedPartitionsIsrChangeListener监听/brokers/topics/huying_test1/partitions/0/state
3.触发点是PartitionsReassignedListener,对每个partition先判断新分配的分区是否已经分配好,是否有对应的broker是挂掉的,这两种情况直接退出。然后注册监听器ReassignedPartitionsIsrChangeListener,再后进入onPartitionReassignment
4.KafkaController.onPartitionReassignment 初始状态新分配的replica只要有not in sync的状态areReplicasInIsr的判断就是false,首先把旧的replica和新分配的replica的并集更新一下内存和zk
5.每个partition也有一个leaderEpoch,用于ReplicaManager判断是否是旧的leaderAndISRRequest,这个属性保存在zk的原因应该是controller需要知道leaderEpoch信息
6.先更新一下zk上的leaderEpoch然后给并集所有的broker发送一个LeaderAndIsr来调整leader
7.把差集的replica设置成NewReplica的状态,这个过程也会发送LeaderAndIsr请求
8.这个过程可能会触发ReassignedPartitionsIsrChangeListener,会先判断这个partition是否结束了reassignment,然后判断是否所有reassign的replica都已经追上(isr),如果追上就再次进入onPartitionReassignment
9.把reassignedReplicas在replicaStateMachine里面转换成Online的状态,在内存里面把原来的并集替换成reassignedReplicas,如果leader不在reassignedReplicas里面通过partitionStateMachine发起一个新的partition leader选举,如果leader在的机器挂了也要重新选举,否则就只是发送请求更新一下leaderEpoch
10.把差集replica下线,下线是通过replicaStateMachine做的,依次做了几个状态变更OfflineReplica->ReplicaDeletionStarted->ReplicaDeletionSuccessful->NonExistentReplica过程中可能会停止replica fetch的线程和删掉Log
11.先把reassignedReplicas更新一下zk,然后更新一下/admin/reassign_partitions的内容,取消掉对应的ReassignedPartitionsIsrChangeListener,最后给每个broker发送一个更新metadata的请求
12.感觉这个流程主要做的是replica调整,并没有涉及到停写,和consumer消费完流程的控制,过程中可能会出现消息乱序。update,这里还真不需要,kafka一个topic多搞几个队列,扩容的时候每个机器assign的队列少一点就实现自动扩容了,缺点是文件多了,写性能可能会下降。rocketmq因为是双写而不是isr,所以扩容就需要停写和消费完。

高可用

high water mark

1.high water mark通常是一个partition所有replica的endOffset的最小值,也是同步提交模式情况下集群commit的位置
2.读的时候客户端最多只能读到highWatermark的位置
3.一个follower挂掉重启的时候首先扔掉它自己highWatermark之后的数据,然后开始追赶leader
4.leader挂掉的会重新选,新leader把自己的endOffset当做新的highWatermark,然后让其它的replica开始追赶

in sync replica

1.Partition会启动一个超时线程,调用Partition.maybeShrinkIsr,检查所有isr的end offset和最后更新时间,差距过大的就踢掉
2.处理follower的fetch request的时候可能会触发Partition.updateLeaderHWAndMaybeExpandIsr,重新检查这个replica的各种条件,可能的话就加回到isr

partition leader election

1.PartitionLeaderSelector.OfflinePartitionLeaderSelector是用得最多的leader selector,选leader的逻辑是从controller context里面拿出live isr和live replica,优先选择live isr的第一个,如果没有就选择live replica的第一个,否则就抛异常
2.更新zk path /brokers/topics/huying_test1/partitions/0/state
3.给这个partition的每个replica broker发一个请求LeaderAndIsrRequest,其中leader和follower的指令稍有区别。然后给所有活着的broker发一个请求UpdateMetadataRequest。这里的请求都是异步带callback(多数情况是null)的
4.发送请求如果失败会重试,终止重试的流程是zk那边发起,导致当前发送线程被关闭
5.接收到请求的broker开始ReplicaManager.makeLeaders, makeFollowers相关的流程
6.处理UpdateMetadataRequest请求比较简单,更新一下cache数据就ok了
7.触发点,KafkaController.onBrokerStartup, KafkaController.onBrokerFailure分别对应broker启动的时候和broker挂掉的时候,KafkaController.onPreferredReplicaElection,使用了工具
bin/kafka-preferred-replica-election.sh --zookeeper localhost:12913/kafka --path-to-json-file topicPartitionList.json

这个工具的作用是重新平衡partition leader的位置,因为broker的各种failover可能partition的leader会飘到一台机器上面去,这个工具可以使leader重新平衡。工作的原理是监听/admin/preferred_replica_election节点,触发一次partitionStateMachine.handleStateChanges,每个partition都选第一个replica当leader,由于之前分配topic,partition的时候已经在broker上面做了平衡所以这个时候leader的分配也是平衡的

controller leader election

1.controller是kafka全局的leader,controller负责监听zookeeper然后分析状态变更发送命令到其它的broker
2.broker启动的时候开始监听/controller节点,这里注意一个zk的bug(session失效可能过了一段时间临时节点才会删除,如果碰到node exist exception就返回可能会都选不上) LeaderChangeListener可能会触发KafkaController.onControllerFailover或者KafkaController.onControllerResignation
3.onControllerFailover首先会读取zk上的epoch,然后+1乐观锁方式更新zk,epoch用来标识正确的leader,当新leader产生旧leader没挂掉的时候可以作废掉旧leader的命令,broker里面关注epoch的manager保存一份epoch,接到比自己大的epoch就设定成新的
4.监听几个zk path,/admin/reassign_partitions,/admin/preferred_replica_election,/brokers/topics,/admin/delete_topics,/brokers/ids,/brokers/topics/topicXXX
5.初始化controllerContext相关的内存结构和一些manager,并给其它的broker发一个metadata更新的命令
6.如果配了自动平衡partition leader,就启动一个检查并自动平衡partition leader的scheduler,调用checkAndTriggerPartitionRebalance,不平衡率高的时候触发onPreferredReplicaElection
7.onControllerResignation执行相反的过程,取消注册之前的几个zk listener并且关闭几个manager

broker启动和关闭

1.broker启动的时候注册SessionExpireListener,这里调用的是zkClient.subscribeStateChanges(sessionExpireListener)而不是监听zk临时节点,感觉碰到机器假死会少了删临时节点的手段。然后注册临时节点/brokers/ids/xxx,这个临时节点会被controller的BrokerChangeListener监听
2.KafkaController.onBrokerStartup 首先给新启动的broker发送metadata
3.给新broker上的所有replica触发OnlineReplica的状态变更,会触发broker上面的replicaManager.becomeLeaderOrFollower,具体的逻辑和前面创建topic里面描述的相同
4.触发partitionStateMachine的状态变更,可能会导致leader重新选举,同时也会触发broker上面的replicaManager.becomeLeaderOrFollower
5.如果在新的broker上有partition reassignment,就调用onPartitionReassignment
6.broker关闭的逻辑类似,也是触发partitionStateMachine和replicaStateMachine的状态变化,稍有不同的是那些leader在这台broker上的partition要先触发一个Offline再触发一个Online

rebalance

脑裂问题和惊群效应

1.因为网络延时和不同的客户端连的zk server可能不同,不同客户端看到的zk状态可能不完全一致导致rebalance的时候的判断可能不太准确zk different client view,导致rebalance因zk ownership冲突而失败。有一个bug,[KAFKA-242]比较快的连续调用ConsumerConnector.createMessageStreams,可能会导致位点不正确前置导致丢消息,看起来发生在前一次ConsumerConnector.createMessageStreams导致rebalance结束前又调用了一次ConsumerConnector.createMessageStreams
2.一个broker或者consumer的变化可能会导致所有的consumer的rebalance。0.8版本用consumer thread id排序然后重新分配消费,中间插入一个consumer可能会导致很多consumer的分配变化,而且每次rebalance都是要关闭消费者(包括关连接和清queue),相当重。0.9 group reassignment 也是用的roundrobin 但是rebalance不会直接关闭连接

kafka 0.9的解决方案



1.增加了kaka.coordinator这个新包处理consumer的协调问题,GroupCoordinator是broker端处理所有group api请求的核心类。0.9包的客户端只有java版的
2.当一个broker变成__consumer_offsets的某个partition的leader的时候会异步的读取__consumer_offsets里面的消费位点和consume group metadata并缓存起来
3.KafkaApis里面ListGroupsKey,DescribeGroupsKey很简单就是取一下cache,HeartbeatKey也简单正常情况就是给个response然后schedule下一次heartbeat
4.SyncGroupKey,如果coordinator处于Dead,PreparingRebalance状态都是简单的不做处理,处于Stable也仅仅只是触发一下下一次心跳,处于AwaitingSync状态只处理leader client(加入group的第一个client)的请求,把当前的groupMetadata和assignment数据序列化成一条消息发到topic里面去。发送失败重新触发rebalance,发送成功本地保存一下assignment并且把状态转成stable。这里的回调看起来比较复杂,但实际上只是把返回状态码处理的逻辑传来传去
5.JoinGroupKey,加入group前先有一个简单的协议验证,协议在ConsumerProtocol比较简单,有一些版本号,topic,partition之类的信息。实际处理逻辑在GroupCoordinator.doJoinGroup进行,如果coordinator处于PreparingRebalance状态,收到请求增加或者更新一下group member然后触发下一次rebalance,处于AwaitingSync状态,如果碰到不在group里面的client那接添加member触发下一次rebalance,如果是已知的client先进行一次protocol比较也就是比较metadata,如果没有变化那就加入成功给response,否则就认为metadata变化了会更新metadata触发下一次rebalance。处于Stable状态,碰到新client或者client metadata变化都会促发rebalance。如果coordinator处于PreparingRebalance状态在最后会触发joinPurgatory.checkAndComplete把之前delay的操作刷掉。触发rebalance是往joinPurgatory里面丢一个DelayedJoin操作,DelayedJoin在tryComplete时会检查是否group里面的client都加入了,如果是会触发GroupCoordinator.onCompleteJoin,onCompleteJoin先移除失败的client,然后把generationId +1,选择一个protocol(取交集的第一个),把状态改成AwaitingSync,最后给每个client返回JoinGroupResult(有generationId,leaderId和选择的protocol)
6.LeaveGroupKey,先移除心跳配置,然后根据coordinator状态可能触发一次rebalance
7.GroupCoordinatorKey,读取group coordinator信息,首先读取topic __consumer_offsets的metadata,根据groupId算出group metadata应该在哪个partition里面,读取对应partition的partitionMetadata里面的leader信息这个leader就是group coordinator。比较重要的问题就是谁是coordinator,GroupMetadataManager.isGroupLocal->ownedPartitions.add(offsetsPartition)->loadGroupsForPartition->handleGroupImmigration->handleLeaderAndIsrRequest,所以__consumer_offsets这个topic对应的partition的leader就是group coordinator
8.KafkaConsumer每次拉消息的时候会先确保拿到server端coordinator的信息,过程中可能会发送GroupCoordinatorRequest。检查一下consumer是否在group内,如果不在就发送JoinGroupRequest,拿到response之后会发送SyncGroupRequest,前面的response会告诉consumer是不是leader,如果consumer是leader,会先调用performAssignment(这部分逻辑在WorkerCoordinator源码在外面)进行partition-consumer分配,request里面会带上group assignment,这些做完以后就实际拉数据了,比较简单
9.consumer的heartbeat启动在AbstractCoordinator.HeartbeatTask.reset,也比较简单,heartbeat失败就标记coordinator挂掉了,碰到ILLEGAL_GENERATION设置状态rejoinNeeded,这个状态位会使得消费的时候先停住发送JoinGroupRequest
10.流程串起来,consumer首先给自己知道的broker发请求得知coordinator地址,consumer通过心跳保持和coordinator的联系,如果心跳返回IllegalGeneration就意味着要rebalance,下一次消费消息会block住并且发出JoinGroupRequest,收到响应后如果是leader consumer会进行partition分配,接下来发送SyncGroupRequest,leader的request会让coordinator进入stable状态,follower会从coordinator那里拿到partition assignment,这个做完以后才开始拉新的partition
11.coordinator是__consumer_offsets这个topic的partition所在的leader机器,每次partition leader选举的时候新leader会从本地的log文件读取到GroupMetadata做缓存,consumer挂掉的时候coordinator把它移出group并且触发rebalance,coordinator通过IllegalGeneration通知consumer发生rebalance并且只有收到所有的consumer的JoinGroupRequest之后coordinator才会给consumer响应JoinGroupResponse,结束rebalance。coordinator也会监听topic partition的变化,必要的时候触发rebalance。consumer join group的时候会被coordinator分配一个consumer id之后的每次HeartbeatRequest和OffsetCommitRequest请求都带consumer id,如果哪次heartbeat请求或者commit offset请求的consumer id不对,coordinator会返回一个UnknownConsumer。

高性能

1.磁盘顺序写速度很快,pagecache的存在加速读写
2.攒消息成message set,网络上传大包,更大块的磁盘操作,内存连续性更好
3.协议固定,消息字节在broker,consumer,producer之间传输不需要修改
4.字节从pagecache到socket在linux可以通过sendfile优化

小技巧

1.SkimpyOffsetMap OffsetMap的开放式寻址实现,省内存,不能删除。
2.Throttler 一个小小的限流器,通过sleep限流。
3.多层结构的TimingWheel,解决了简单TimingWheel范围比较小的缺点。

ZK技巧

在看kafka之前让我设计broker和zk的监听结构,对于partition leader那一块我一定会这样设计

这样做的后果是当partition很多,broker也不少的时候zk上面的监听器会非常多,这样会导致zk的压力过大,而且可能zk有很多通知都是不必要的。而kafka的zk监听结构大致是这样,

broker先通过zk选出一个leader,然后只有leader监听zk上面的partition变化,监听到变化以后leader会决定是否给其它broker发送命令,这种结构zk的压力就不会增长得太厉害

ZK结构

截了一个简单的,有些需要流程触发的节点没有
/consumers
/consumers/console-consumer-84505
/consumers/console-consumer-84505/offsets
/consumers/console-consumer-84505/offsets/huying_test
/consumers/console-consumer-84505/offsets/huying_test/3
/consumers/console-consumer-84505/offsets/huying_test/2
/consumers/console-consumer-84505/offsets/huying_test/1
/consumers/console-consumer-84505/offsets/huying_test/0
/consumers/console-consumer-84505/owners
/consumers/console-consumer-84505/owners/huying_test
/consumers/console-consumer-84505/owners/huying_test/3
/consumers/console-consumer-84505/owners/huying_test/2
/consumers/console-consumer-84505/owners/huying_test/1
/consumers/console-consumer-84505/owners/huying_test/0
/consumers/console-consumer-84505/ids
/consumers/console-consumer-84505/ids/console-consumer-84505_huyingdeMacBook-Air.local-1460795322590-79d6b9c5
/config
/config/topics
/config/topics/huying_test
/config/changes
/controller
/admin
/admin/delete_topics
/brokers
/brokers/topics
/brokers/topics/huying_test
/brokers/topics/huying_test/partitions
/brokers/topics/huying_test/partitions/3
/brokers/topics/huying_test/partitions/3/state
/brokers/topics/huying_test/partitions/2
/brokers/topics/huying_test/partitions/2/state
/brokers/topics/huying_test/partitions/1
/brokers/topics/huying_test/partitions/1/state
/brokers/topics/huying_test/partitions/0
/brokers/topics/huying_test/partitions/0/state
/brokers/ids
/brokers/ids/0
/zookeeper
/zookeeper/quota
/controller_epoch

scala爽和不爽

ret.get(leaderBrokerId) match {
  case Some(element) =>
    dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
  case None =>
    dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
    ret.put(leaderBrokerId, dataPerBroker)
}
模式匹配,在接收请求的时候用起来特别爽
brokerIds.map { brokerId =>
  partitionReplicaAssignment
    .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
    .map { case(topicAndPartition, replicas) =>
             new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
}.flatten.toSet
server端编程大量集合操作的时候这种高阶流式处理集合比java方便太多
val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
这里用到了Stream的lazy evaluation,制造了一个无限循环的队列,cool!

不爽的地方
1.debug的时候ide支持还是不够,有些表达进不去或者看不到值
2.有些写法写起来爽读起来痛苦,比如这样的
val streams = e._2.map(_._2._2).toList

发送端性能问题

Sarama假异步

sarama客户端本身是支持异步发送模型的,但是sarama老版本实际上做的假异步https://github.com/Shopify/sarama/issues/2103
在后续的版本中做了修复,https://github.com/Shopify/sarama/releases/tag/v1.31.0

服务端请求处理

1.网络层将请求投递到 RequestChannel 
2.多线程的 KafkaRequestHandler 并发的从 RequestChannel 获取请求, 交给 kafkaApis 处理

这里看起来会有顺序问题,但是processCompletedReceives里面会调用selector.mute禁读,所以服务端对同一个连接上的读请求是一个一个顺序处理的过程。这个时候在tcp buffer里面是可以缓存更多的请求的,通过这个buffer可以平摊掉客户端到服务端的网络延迟

WaitForAll

因为服务端对同一个连接上的请求是顺序处理的,所以WaitForAll的时候broker间数据复制的延迟会让服务端处理一个请求的耗时变长而这个耗时是没法通过异步的方式化解的。batch还是有可能平摊掉多个请求顺序处理的时间,但如果slave broker的io有问题batch效果可能也不明显?






2016年4月13日星期三

2PC和3PC一点理解

为啥不是1PC?

协调者直接发提交命令给每个参与者岂不很好?只能成功不能失败,如果参与者挂了等恢复的时候重新找协调者拉一下状态就好了。1pc主要是可能会碰到某些参与者因为条件不满足必须回滚,没有选择的机会。

BEST EFFORT 1PC

主要特点有两点
1.会把事务或者关键事务推到最后,先把容易出错的事情都做完,到了最后关头统一提交事务,这样就不太容易失败。  
2.在某些点失败了是要不断重试来保证一致性的。比如下单和扣款,下单逻辑走完后调支付接口,支付完成之后会不断回调下单系统(此时支付已经不能回滚),只有回调成功之后才会停止回调然后下单这边根据支付的结果决定提交还是回滚。

这种玩法限制还是很多的,比如一致性问题就比较需要业务方来保证,比较复杂的场景就不一定能保证。

2PC的问题

1.参与者timeout,第一阶段发生协调者直接rollback事务,第二阶段发生就必须等参与者恢复重新从协调者那边拿到状态做相应的操作。
2.协调者挂掉,第一阶段挂掉,部分参与者prepared,部分参与者初始状态,备用协调者启动以后可以继续发送prepared或者rollback。第二阶段挂掉,备用协调者启动后发现部分参与者是prepared,部分参与者是committed,继续发送commit。
3.协调者发送第一个commit的时候和接收的参与者同时挂掉,剩下的参与者全部是prepared状态,备用协调者启动以后不知道挂掉的参与者是什么状态,如果发送rollback可能参与者已经commit,commit可能不可逆,如果发送commit可能参与者已经被rollback,事务保存的数据已经丢失。问题的关键在于此时备用协调者无法知道之前的协调者作的决策,所以整个事务就处于hang住的状态只能等挂掉的参与者恢复才能继续。

3PC的方案

1.3pc最关键要解决的就是协调者和参与者同时挂掉的问题,所以加入了一个precommit状态标识(便于理解我把3pc的几个状态和2pc的对应成一样的,init,prepared,precommit,commit)。3pc在第一阶段协调者和参与者如果同时挂掉和2pc第一阶段一样,备用协调者看到的是prepared和init的状态或者全部是init或者全部是prepared状态,这个时候可以全部rollback。协调者如果在precommit的时候和第一个参与者同时挂掉,备用协调者看到的全是prepared的状态,可以选择rollback,挂掉的第一个参与者恢复以后如果是rollback自然ok,如果是precommit,也可以rollback,这是和2pc最大的不同。如果备用协调者看到了precommit的状态意味着之前的协调者做出的决策是precommit,可以把commit流程继续下去。
2.3pc的另一个关键是有timeout时间,所以无论是协调者或者参与者只要是活着的都知道怎么走下去。比如在发送precommit的过程中如果部分机器挂掉,部分机器是precommit的状态,部分机器是prepared的状态,协调者接收不到有些机器的响应就会发送rollback,那些precommit的机器就应该超大概率响应rollback,那些prepared的机器会超时rollback,最总达成状态一致。最后一个阶段即使部分机器接受不到commit命令最后也会在timeout以后commit达成一致。
3.3pc似乎怕断网,前面部分precommit部分prepared的状态,如果协调者发送rollback的时候precommit那些机器断网了那就状态不一致了,似乎只能报警人为干预了?

TCC

1.try,confirm,cancel,这是服务层而不是数据库的事务方案,需要在业务层面实现三个方法,需要在原有数据库表上加上一个冻结字段,在try的时候冻结,confirm的时候在目标字段上更新冻结字段,cancel的时候取消冻结
2.冻结字段的好处是不会被读到中间状态产生读不一致,缺点是侵入性强,实现三个方法复杂度大。

SAGA

1.和tcc一样是服务层的实现,有侵入性,但不需要加字段,只需要实现两个方法即正常的业务操作和反向业务操作即可。
2.和tcc比,优点是实现简单,缺点是可能会被读到中间状态类似数据库事务的读未提交。和2pc比没有2pc那样为了保证隔离性而造成的所有资源加锁。
3.做分布式事务的一种土办法是用消息系统+数据库实现软事务,业务1成功+消息发出 消息消费+业务2成功。saga可以看成这种土办法的自动化加强版本,这种状况下因为消息可能重投所以需要业务做幂等。
4.saga事务状态流转控制可以用集中控制器,也可以用基于event的分布式流转的方式。




2016年1月30日星期六

学习一下Disruptor

最近项目用到了Disruptor,趁机好好学习了一把,先上类图















几个重要的流程


发布数据


1. RingBuffer.next -> Sequencer.next 这里可能会block住取决于消费端的消费速度。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
    long minSequence;
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
    {
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }

    this.cachedValue = minSequence;
}

2. RingBuffer.publish -> Sequencer.publish -> WaitStrategy.signalAllWhenBlocking 如果消费者在这里block住了,会通知消费者消费。


消费数据


EventProcessor -> SequenceBarrier.waitFor -> EventHandler.onEvent 如果消费速度赶上生产速度SequenceBarrier会把消费流程block住。
Disruptor提供了两种预设的消费者,BatchEventProcessor,WorkProcessor,区别在于BatchEventProcessor是批量读取消费,而WorkProcessor是单条消费一组WorkProcessor组成一个WorkerPool共享一个消费Sequence实现协同工作。

WorkerPool这种工作方式因为有多个线程共享的Sequence所以违反了单写原则是有一定性能损耗的,另一种实现协同工作的方式是使用多个BatchEventProcessor一起消费,这种情况下每个BatchEventProcessor都是消费相同的数据,但是可以在BatchEventProcessor里面通过hash取模的方式过滤掉应该被其它线程拉取的数据。


多级消费


// 方法1
disruptor.handleEventsWithWorkerPool(parserWorkPool).then(new ThenHandler());

// 方法2
SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();

for (int i = 0; i < parserThreadCount; i++) {
    handlers[i] = new DrcMsgParser(i,parserThreadCount);
    batchParseProcessors[i] = new BatchEventProcessor(
            disruptorMsgBuffer, sequenceBarrier, handlers[i]);
    disruptorMsgBuffer.addGatingSequences(batchParseProcessors[i].getSequence());
}

Sequence[] gatingSequences = new  Sequence[parserThreadCount];

for (int i = 0; i < parserThreadCount; i++) {
    gatingSequences[i]=batchParseProcessors[i].getSequence();
}

SequenceBarrier offerSquenceBarrier = disruptorMsgBuffer.newBarrier(gatingSequences);
offerProcess = new BatchEventProcessor(disruptorMsgBuffer,offerSquenceBarrier, new OperationBufferScanThread(transferer));
disruptorMsgBuffer.addGatingSequences(offerProcess.getSequence());

上面两种方法都可以实现多级消费,方法1是方法2的封装,方法2中核心是,


SequenceBarrier offerSquenceBarrier = disruptorMsgBuffer.newBarrier(gatingSequences);
offerProcess = new BatchEventProcessor(disruptorMsgBuffer,offerSquenceBarrier, new OperationBufferScanThread(transferer));


在BlockingWaitStrategy中


while ((availableSequence = dependentSequence.get()) < sequence)
{
    barrier.checkAlert();
}


次级消费者的消费Sequence会一直小于前面一级消费者的最小Sequence



单写原则(Single Writer Principle)

我之前所以为的Disruptor快的原因是,1.RingBuffer 2.防止Cache false sharing的padding 3.Cas操作,可惜1没有理解到位而3正好相反了。应该说Disruptor是单写原则的一个实践。

作者的帖子在 http://mechanical-sympathy.blogspot.co.uk/2011/09/single-writer-principle.html


在这里我寻章摘句一下,

 These lock/CAS instructions cost 10s of cycles in the best case when un-contended, plus they cause out-of-order execution for the CPU to be suspended and load/store buffers to be flushed.  At worst, collisions occur and the kernel will need to get involved and put one or more of the threads to sleep until the lock is released.  

lock/CAS指令在没有竞争的情况下需要消耗数十个cpu周期加上执行过程中可能会等待load/store buffer同步。更坏的情况下内核可能需要介入把某些线程强制休眠知道锁被释放。

It is interesting to see the emergence of numerous approaches that lend themselves to single writer solutions such as Node.js, Erlang, Actor patterns, and SEDA to name a few.  Unfortunately most use queue based implementations underneath, which breaks the single writer principle, whereas the Disruptor strives to separate the concerns so that the single writer principle can be preserved for the common cases.

现在有一些有趣实现单写原则的方式,比如说Node.js, Erlang, Actor模式和SEDA等等。可惜的是这些实现方式的底层多数都是基于queue实现的,而queue本身是违背单写原则的,反之Disruptor则力求保持单写原则。
(LinkedBlockingQueue有锁,而且head,last指针和size变量都是需要Cas的。 ConcurrentLinkedQueue也是有head,tail,next指针需要Cas)

If a system is decomposed into components that keep their own relevant state model, without a central shared model, and all communication is achieved via message passing then you have a system without contention naturally.  This type of system obeys the single writer principle if the messaging passing sub-system is not implemented as queues.



如果一个系统被分解成很多自己封装好状态的组件而没有一个中央共享的结构并且所有的通信都是基于不是用queue实现的消息系统,那这样的系统是符合单写原则的。


Disruptor的单写原则












上面这张图是单个生产者,单个消费者的交互流程。流程中有两个位置需要限流,(1)消费者等待生产者,这是通过BatchEventProcessor.sequence<Sequencer.cursor来实现的。(2)生产者等待消费者,这是通过Sequencer.cursor-bufferSize<gatingSequence来实现的。BatchEventProcessor.sequence是消费者线程在写,生产者线程在读,而Sequencer.cursor是生产者线程在写,消费者线程在读,所以都是符合单写原则的。扩展到多个消费者,因为多个消费者都是自己维护自己的sequence,所以还是能保持单写原则。不过WorkerPool这种消费方式由于多个worker会共享workSequence,所以还是有点违反单写原则的。

多个生产者的情况要复杂一些,生产者和消费者之间的交互和单个生产者的情况基本相同,但是还需要考虑生产者之间的交互,主要的逻辑在MultiProducerSequencer里面。

MultiProducerSequencer.next

current = cursor.get();
next = current + n;

long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

    if (wrapPoint > gatingSequence)
    {
        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
        continue;
    }

    gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
    break;
}

首先要说上面这段代码的最后用了Cas,所以还是有一点违背了单写原则,这个问题Disruptor的作者也是承认的。和单个生产者还有一个不同地方就是在next方法里面就更新了cursor,消费者的block就有可能被解除了。

MultiProducerSequencer.publish->setAvailable->setAvailableBufferValue

private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}


这里为了尽量维持单写原则用了一个很cool的技巧,availableBuffer是一个长度和RingBuffer相同的缓存,存的flag的含义是index这个位置上RingBuffer的生产者已经绕了几圈。

在ProcessingSequenceBarrier.waitFor里面除了等待BatchEventProcessor.sequence<Sequencer.cursor之外,会返回一个当前发布的最大sequence。

public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
    {
        if (!isAvailable(sequence))
        {
            return sequence - 1;
        }
    }

    return availableSequence;
}

在这里之前之前缓存的那个圈数就可以用来判断某个sequence是否已经published。availableBuffer虽然被多个线程写了,但是对于里面的一个slot在一个时间点是只会有一个线程去写的。


Lazy Set!

促使我仔细学习Disruptor的动力在于浏览Sequence代码的时候看到这一段

public void set(final long value)
{
    UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}

Disruptor里面默认的set方法是lazySet的,之前基本没有这么玩过,极端不明觉厉。

http://psy-lob-saw.blogspot.co.uk/2012/12/atomiclazyset-is-performance-win-for.html

这篇帖子对lazy set和单写原则之间的关系总结的很好。

1. lazy set和普通的volatile set有什么不同?

lazySet provides a preceeding store-store barrier (which is either a no-op or very cheap on current platforms), but no store-load barrier

lazySet会前置一个store-store barrier(在当前的硬件体系下或者是no-op或者非常轻),而不是store-load barrier

2. store-store barrier能保证什么?

StoreStore Barriers: The sequence: Store1; StoreStore; Store2 ensures that Store1's data are visible to other processors (i.e.,flushed to memory) before the data associated with Store2 and all subsequent store instructions.

在操作序列Store1; StoreStore; Store2中,Store1的数据会在Store2和后续写操作之前对其它处理器可见。换句话说,就是保证了对其它数据可见的写的顺序。

3. lazy set和单写原则的关系

But if there is only a single writer we don't need to do that, as we know no one will ever change the data but us.
And from that follows that strictly speaking lazySet is at the very least as correct as a volatile set for a single writer.

如果只有一个线程写我们就用不着store-load barrier,lazySet和volatile set在单写原则下面是等价的。

4. lazy set会不会影响可见性?(too lazy)

Will lazySet write actually happens in some finite time?
The most you can say from the spec is that it will be written no later than at the point that the process must write anything else in the Synchronization Order, if such a point exists. However, independently of the spec, we know that so long as any process makes progress, only a finite number of writes can be delayed. So, yes.

这一段有点绕口,不会结论是lazy set一定会可见的。但是从Doug Lea的描述中并不能保证lazySet的可见性延迟一定会很小,所以作者做了一些实验,给出的数据说明lazy set的延迟其实应该是相当小的。

http://robsjava.blogspot.jp/2013/06/a-faster-volatile.html

However, in practice, these writes usually become visible to the other threads within a few nanoseconds.

上面这篇帖子也是说lazySet的写在实践上来延迟是纳秒级。

在Disruptor里面没有setVolatile调用,主流程里面compareAndSet的调用只有一个地方,其它所有的更新都是lazySet。


Disruptor真是方寸之间深不见底,Martin Thompson请收下我的膝盖。。。


补充一些信息省的老忘老去找

1.volatile read在x86体系下面非常轻量,volatile write后面会跟StoreLoad屏障比较重,这么做是因为大多数程序都是读比写多很多。
http://brooker.co.za/blog/2012/09/10/volatile.html

2.没有StoreLoad屏障会怎样?看Peterson算法

Peterson算法用三个变量flag0, flag1, turn加循环实现了线程互斥访问,
flag0=ture先声明自已要锁
turn=1先尝试把资源让给线程1防止死锁
while flag1==true && turn==1 线程1要锁且得到了锁所以等待
flag0=false声明自己不要锁了
基于共享变量很巧妙的锁算法。。。

如果flag0=true和flag1==true重排,就有可能发生两个线程都可以同时通过while判断

3.从内存模型角度看线程0和线程1可以同时访问变量x,变量x在不同core的buffer里面,所以x的最终结果可能是错误的。但如果只有一个线程写,那它始终只能在一个core的buffer里面,所以不可能读到一个陈旧的值,然后根据这个值去计算然后更新出错误的结果

4.https://juejin.im/post/6844904144273145863 这篇文章把内存屏障彻底理解清楚了