2023年10月9日星期一

ClickHouse学习笔记

一直对大数据计算很感兴趣,ClickHouse又是感觉很酷的实现,所以花了一段时间研究,整理在这里。

Debug配置

debug上面踩了不少坑。
  1. 首先是用macbook可以build但要花4,5个小时,用台式机会快很多。
  2. 在老一点版本的clion上可以用gdb+'CMAKE_BUILD_TYPE=RelWithDebInfo'。新版默认是lldb和RelWithDebInfo同时用有点问题,捕捉不到断点。
  3. 最后解决办法是在ubuntu台式机上编译,RelWithDebInfo模式,Debug模式还是有点慢而且debug过程中异常退出的概率有点大,然后clion启动remote debug连接ubuntu上面的gdbserver
步骤
  1. ubuntu上面build CMAKE_BUILD_TYPE=RelWithDebInfo
  2. 启动gdbserver gdbserver :1234 build_light/programs/clickhouse server --config-file=/home/hy/config.xml 需要config.xml打开clickhouse的远程连接
  3. clion上面启动remote debug,关键步骤是需要把ubuntu上面编译的可执行文件下载下来,然后设置成debug环境的Symbol file
  4. 在clion的gdb交互窗口输入 handle SIGUSR1 noprint nostop ,handle SIGUSR2 noprint nostop 或者加到gdbinit
  5. stl pretty print,如果有python问题,可以使用自己的gdb。clickhouse使用的是libc++而不是libstdc++,使用https://github.com/koutheir/libcxx-pretty-printers
  6. 查看stl range的方法,reinterpret_cast<uint64_t &>(key_range.left.storage)

重要流程

启动

  1. main(main.cpp)->mainEntryClickHouseServer->Poco::Util::ServerApplication:run->Server::main(Server.cpp)->Poco::Net::TCPServer.start

请求处理

  1. TCPHandler::runImpl->receivePacket->executeQuery->processInsertQuery/processOrdinaryQuery
  2. bio的模式一个线程一个连接

词法分析

  1. Lexer::nextTokenImpl 典型的一个一个token的处理会回看上一个token

语法分析

  1. IParserBase::parse 所有类型parser都会经过这一步,在expected里面记录parse的轨迹。add过程可能触发Lexer::nextTokenImpl(看index是否大于现在scan过的)
  2. ignore方法寻找并跳过某个关键字
  3. 最简单的例子'select * from test',调用链ParserQueryWithOutput::parseImpl->ParserSelectWithUnionQuery::parseImpl->ParserUnionList::parseImpl->ParserUnionQueryElement::parseImpl->ParserSelectQuery::parseImpl(这里逻辑最多)->ParserTablesInSelectQueryElement::parseImpl->ParserTableExpression::parseImpl->ParserCompoundIdentifier::parseImpl(到这里生成table的ast) 经典的递归下降,会产生ast

AST重写

  1. InterpreterSelectQuery::InterpreterSelectQuery->ApplyWithSubqueryVisitor 把with对应的subquery解析出来放map里面
  2. ApplyWithAliasVisitor 也是处理with好像只全局处理没有递归下降
  3. RewriteCountDistinctFunctionVisitor 把count(distinct)替换成count(group by)
  4. QueryAliasesVisitor 访问ast把所有的别名放在一个map里面
  5. ExecuteScalarSubqueriesVisitor 把子查询替换成常量,先查缓存,如果查不到使用interpreter getQueryInterpreter解释执行成block然后缓存。worthConvertingToLiteral 复杂类型不做替换需要的计算比较多
  6. QueryAliasesMatcher 给每个子查询设置一个单独的别名
  7. rewriteMultipleJoins 重写join 
    1. CrossToInnerJoinMatcher 把不同表之间的=表达式(外部)移到join里面 
    2. JoinToSubqueryTransformMatcher 'select * from t1 join t2 on ... join t3 on ... join t4 on ...' 改写成 'select * from (select * from t1 join t2 on ...) join t3 on ...) join t4 on ...'
  8. JoinedTables::rewriteDistributedInAndJoins
    1. InJoinSubqueriesPreprocessor 关联到distributed_product_mode配置,意义在于分布式子查询的优化,最理想的模式是被关联的表sharding的状态下在每个sharding上做local关联。参考分布式子查询。代码做的事情较简单,根据实际情况加GLOBAL,或者改库表名成remote模式这样分布式存储可以识别并本地化
    2. RenameQualifiedIdentifiersVisitor 改写子查询中的列名,去使用新的表名
  9. additional_table_filters 参考core settings 额外过滤条件
  10. joined_tables.makeTableJoin->replaceJoinedTable 谓词下推,把join table替换成(SELECT * FROM table_name) AS table_short_name

继续AST重写

  1. analyzeFunctionParamValues->FunctionParameterValuesVisitor 收集参数视图参数,replaceQueryParametersIfParametrizedView 参数替换
  2. replaceWithSubquery 在查询语句里用物化视图的sql语句替换视图名
  3. TreeRewriter.analyzeSelect 一系列处理比如别名替换,标量子查询替换成常量,多余的列删除...
  4. renameDuplicatedColumns 重复列改名
  5. TreeOptimizer::optimizeCountConstantAndSumOne 语义和count等同的聚合函数改写成count
  6. translateQualifiedNames db.table.column, table.column类型的名字变成正常的名字,*这样的列名扩充成实际列名
  7. LogicalExpressionsOptimizer 布尔表达式优化,一串or变成in
  8. TreeRewriter::normalize 用户定义的sql function apply(函数名替换成函数体),count_distinct,distinct_if等等几个函数apply,EXISTS(subquery)替换,位置参数改成列名,nullin系列表达式处理,函数名改成小写,公共子表达式删除
  9. expandGroupByAll 把group by all里面的all变成实际列
  10. removeUnneededColumnsFromSelectClause 冗余列删除
  11. executeScalarSubqueries 标量子查询替换
  12. PredicateExpressionsOptimizer.optimize where和prewhere里面的谓词下推到子查询里面,https://github.com/ClickHouse/ClickHouse/pull/2015#issuecomment-374283452
  13. TreeOptimizer::optimizeIf 完成了子查询标量替换之后先尝试直接判断标量把整个表达式替换成条件分支,optimize_if_chain_to_multiif 把三元表达式优化成multiple if
  14. TreeOptimizer::apply后面有一批优化在这个函数里面
  15. optimizeFunctionsToSubcolumns 类似于这样'length(arr)' -> 'arr.size0'
  16. optimizeAggregationFunctions 聚合函数里面的数学运算外提'sum(a * 2)' -> 'sum(a) * 2'
  17. convertQueryToCNF 转换成cnf形式,简单说就是最外层全是and这种形式在某些情况更容易使用索引
  18. optimizeWithConstraints 使用constraints关键字的信息干掉查询中多余的部分
  19. optimizeSubstituteColumn 使用constraints信息做列替换
  20. optimizeGroupBy 从groupby里面干掉单射函数和常量表达式
  21. optimizeGroupByFunctionKeys干掉groupby里面的参数不是groupby key的function(不太确定?)
  22. optimizeAnyFunctions所有操作移动到any function之外any(f(x, y, g(z))) -> f(any(x), any(y), g(any(z)))
  23. optimizeCountConstantAndSumOne 把count(常量)和sum(1)这种的简单变成count
  24. optimizeMultiIfToIf multiIf->if在参数个数合适的情况下
  25. optimizeSumIfFunctions 类似sumIf(123, cond) -> 123 * countIf(1, cond)
  26. optimizeArrayExistsFunctions 类似于arrayExists(x -> x = 1, arr) -> has(arr, 1)
  27. optimizeInjectiveFunctionsInsideUniq移除uniq里面的单射函数
  28. optimizeAggregateFunctionsOfGroupByKeys移除group by key上面的min/max等等聚合函数
  29. optimizeRedundantFunctionsInOrderBy ORDER BY x, y, f(x), g(x, y), f(h(x)), t(f(x), g(x)) -> ORDER BY x, y
  30. optimizeMonotonousFunctionsInOrderBy order by里面的单调函数替换成其参数
  31. optimizeDuplicatesInOrderBy 移除order by里面的多余元素,可能是之前的优化产生
  32. transformIfStringsIntoEnum if中的string参数转换成enum
  33. optimizeLimitBy干掉limit by中的重复元素
  34. optimizeUsing干掉using中的重复列
  35. optimizeOrLikeChain 'or xxx like '->multiMatchAny
  36. replaceAliasColumnsInQuery 查询里面的别名展开替换 `day` Date ALIAS toDate(timestamp),`day1` Date ALIAS day + 1,`day2` Date ALIAS day1 + 1 查询条件where day2 = today()展开成 ((toDate(timestamp) + 1) + 1) = today() 
  37. RewriteOrderByVisitor ORDER BY (a, b) -> ORDER BY a,b
  38. MergeTreeWhereOptimizer 计算并尝试把where里面的表达式移动进prewhere,主要是根据一些特殊表达式里面可以判断列的大小来做判断,比如"column_name = constant"
  39. generateFilterActions 根据additional_filter_ast生成filter action
  40. ExpressionAnalyzer::getRootActions 调用ActionsVisitor遍历AST表达式树,生成ExpressionActions,结构不复杂就是遍历ast生成对应的action

执行计划生成

  1. InterpreterSelectWithUnionQuery::execute->buildQueryPlan->executeImpl
  2. prepared_pipe时会增加ReadFromPreparedSource,从远程表读取
  3. executeFetchColumns 作用是返回读取columns数据的stream,可以理解为最基础的数据拉取stream,有count优化,limit优化等等
  4. preliminary_sort 添加order by, limit, distinct等等step
  5. expressions.first_stage 只用远程执行query的场景,添加step,FilterStep,before_array_join,ArrayJoinStep,before_join,converting_join_columns,join step(稍复杂),where step,aggregate step
  6. need_aggregate 有聚合的场景分片上不执行later-stage动作,在windows函数/ORDER BY之前不执行,need_aggregate=false 一些其它的执行顺序的逻辑
  7. expressions.second_stage 分布式查询场景中的聚合节点
  8. executeMergeAggregated从不同节点拉数据合并,having rollup cube等等逻辑
  9. executeWindow 添加windows函数相关的steps
  10. order by相关逻辑,有一些merge sort的逻辑
  11. limit projection offset相关的处理
  12. executeWithFill 处理with fill逻辑
  13. executeSubqueriesInSetsAndJoins 需要子查询结果构建set

pipeline构建

  1. ClickHouse 源码解析(一):SQL 的一生(上)
  2. QueryPlan::buildQueryPipeline->optimize
    1. QueryPlanOptimizations::tryRemoveRedundantSorting 尝试删除多余排序
    2. QueryPlanOptimizations::optimizeTreeFirstPass 深度遍历plan tree,挨个执行getOptimizations里面所有的优化比如 tryPushDownFilter等等
    3. QueryPlanOptimizations::optimizeTreeSecondPass 同样是遍历plan tree
    4. optimizeReadInOrder-同时使用limit和order by如果order by和表顺序一致的优化,变成FinishSorting
    5. aggregation_in_order-GROUP BY表达式至少包含排序键的前缀或注入函数。当从表中读取一个新的键时,聚合的中间结果可以被最终确定并发送给客户端,distinct_in_order distinct上面类似的优化,
    6. optimizePrimaryKeyCondition-不太明白,enableMemoryBoundMerging-enable_memory_bound_merging_of_aggregation_results相关的设置
  3. while (!stack.empty()) ... DFS方式生成pipeline,生成过程调用updatePipeline函数
  4. ISourceStep::updatePipeline(创建并初始化pipeline)->ITransformingStep::updatePipeline(往pipeline添加transformer)
  5. ReadFromMergeTree::initializePipeline->spreadMarkRanges->readInOrder->createSource 这样就有了从源数据读的processor
  6. LimitStep::transformPipeline 给pipeline添加transformer

pipeline执行

  1. ClickHouse 源码解析(二):SQL 的一生(中)
  2. ExecutingGraph::initializeExecution入口函数,将没有出边的node入栈(direct_edges),然后弹出反复调用updateNode
  3. ExecutingGraph::updateNode,做反向状态传播把output node需要数据的状态传播到input,后面是细节展开
  4. processor.prepare(node.updated_input_ports, node.updated_output_ports) 调用 Node 对应 IProcessor::prepare() 方法尝试pull数据
  5. IOutputFormat::prepare(),更新相邻边的状态和自己节点的状态,如果没有数据就设置成NeedData
  6. if (!need_expand_pipeline) 将 Node 相邻的待更新 Edge 放入 update_edges 这个栈中
  7. if (updated_processors.empty()) 简单来说,就是将Edge所指向的Node放入update_processors栈中
  8. ISource::prepare()最终会执行到这里,由于ISource没有 input,则直接返回Status::Ready
  9. case IProcessor::Status::Ready: queue.push(&node) input节点放入入口传入的queue里面,ExecutingGraph初始化完成
  10. ClickHouse 源码解析(三):SQL 的一生(下)
  11. PipelineExecutor::executeImpl 调度执行每个节点 executeStepImpl 单线程执行分支
  12. tasks.tryGetTask->context.executeTask->graph->updateNode执行完节点后更新相邻节点的状态->tasks.pushTasks新的task入全局队列
  13. executeTask->executeJob->IProcessor::work()算子处理数据、传递数据的关键方法
  14. ISource::work() tryGenerate()生成数据返回chunk
  15. spawnThreads() 多线程分支,比较简单,起多个线程threads.emplace_back(后面有没有线程池不确定) 同时从async_task_queue里面拿数据执行,执行流程和单线程类似

MergeTree读链路

  1. CLICKHOUSE 源码解析: MERGETREE READ PATH
  2. InterpreterSelectQuery::executeFetchColumns->StorageMergeTree::read->MergeTreeDataSelectExecutor::read 入口
  3. MergeTreeDataSelectExecutor::filterPartsByPartition 根据partition(比如date分区)筛选parts selectPartsToRead里面根据minmax_idx_condition过滤。之前的调用链MergeTreeDataSelectExecutor::read->readFromParts->ReadFromMergeTree::initializePipeline->getAnalysisResult->selectRangesToRead
  4. MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes 根据主键或者索引筛选出要读取的part range
    1. markRangesFromPKRange 根据 primary key筛选part下的ranges pk在内存里
    2. filterMarksUsingIndex 根据二级索引筛选ranges 可能会读磁盘
    3. process_part 可能并行选取ranges,这里为止都属于selectRangesToRead
  5. ReadFromMergeTree::spreadMarkRangesAmongStreams->ReadFromMergeTree::read->ReadFromMergeTree::readFromPool 后面是实际读取数据的过程
  6. MergeTreeReadPool::fillPerThreadInfo 将读取任务分配到多个线程,按Disk分配线程负载,最大化磁盘IO带宽利用率,将所有 Part 按物理存储磁盘排序,按序将磁盘负载分散到 N 个线程
  7. MergeTreeReadPool::getTask 并不会将线程需要完成的所有任务初始时全部分配,而是每次仅会分配一个小量级的数据读取任务。通过参数: merge_tree_min_rows_for_concurrent_read / merge_tree_min_bytes_for_concurrent_read 配置读取的数据量级。
  8. MergeTreeRangeReader::read 先读取Pre-Where相关Columns,过程中,还会对数据进行Filter,如果 Filter后无数据,便不会进行后续的Read Remaining Columns步骤。
  9. MergeTreeReaderWide::readRows 函数读取范围行数内的所有列的数据。分别对每一列数据,使用对应的反序列化方法进行读取 readRows->readData->serialization(...)反序列化
  10. SerializationString::deserializeBinaryBulk->deserializeBinarySSE2 读数据并复制到data,可以开simd优化,_mm_storeu_si128 / _mm_loadu_si128为C++编译器提供的SIMD内置函数 _mm_loadu_si128 -> SSE2 指令 MOVDQU _mm_storeu_si128 -> SSE2 指令 MOVDQU 此处为较为简单的 SIMD 应用场景: 快速内存复制 (一次性复制更多字节内存)

MergeTree写链路

  1. CLICKHOUSE 源码解析: MERGETREE WRITE-PATH
  2. InterpreterInsertQuery::buildChainImpl->SinkToStoragePtr write->MergeTreeSink::consume 入口
  3. MergeTreeSink::consume 将Block按照Partition拆成N个Block, 写入Part到Temp目录防止Dirty Read, 将TempPart挂载到表空间下, 这样便可以被客户端查询到数据, 触发一些后台执行的任务(例如执行Merge)
  4. MergeTreeDataWriter::splitBlockIntoParts通过executePartitionByExpression()计算出与partition相关的字段(可能会新增字段),通过buildScatterSelector()计算出row与partition之间的映射关系,通过IColumn::scatter()将每个Column都完成一拆多,最终组合为多个Block
  5. MergeTreeDataWriter::writeTempPart 写数据的具体实现
    1. minmax_idx->update(…) 对 Partition 涉及的字段统计 min-max 值,作为额外的谓词下推索引
    2. stableGetPermutation() 对 Block 数据按照 Order By 字段排序,会预先检查是否排序,因此如果数据已经排过序再写入,事实上写入速度会有提升,stableGetPermutation 为稳定排序, 底层实现为 std::stable_sort,时间复杂度为 O(N((logN)^2))
    3. updateTTL(…) 是为支持 TTL 数据生命周期管理做的数据 min-max 索引,用于快速判断一个 Part 是否命中 TTL
    4. MergeTreeData::createPart 创建 Part 实例 注意 choosePartType() 会按照 Block 大小和行数决定是否使用 Compact/Memory 存储方式,默认为 Wide 方式存储
    5. MergedBlockOutputStream::writeWithPermutation, 实现Block数据的写入 MergedBlockOutputStream::writeImpl->MergeTreeDataPartWriterWide::write首先由computeIndexGranularity() 计算当前Block的IndexGranularity getBlockAndPermute->column->permute会对column排序 writeColumn 将每个 Column 分别写入磁盘,calculateAndSerializePrimaryIndex、calculateAndSerializeSkipIndices 写入主键、二级索引
    6. MergeTreeDataPartWriterWide::writeColumn对Column中的每个granule分别执行 writeSingleGranule()写入Column内容到.bin flushMarkToFile()写入Column.mrk索引
    7. MergeTreeDataPartWriterWide::writeSingleGranule->SerializationString::serializeBinaryBulk(完成序列化,根据数据类型会有不同)ColumnString中的多个StringValue是连续组织的,使用Offsets来记录不同的String边界。序列化时,对每个StringValue先写入长度,再写入String内容,这里是写入到WriteBuffer中,真正落地到磁盘还会经过一次压缩

MergeTree Merge

  1. ClickHouse 源码解析: MergeTree Merge 算法
  2. 每次插入数据都会在后台尝试merge,但可能因为选不出合适的part跳过
  3. 选择part的逻辑,首先加入只选择相邻part的限制,然后遍历所有组合两层for循环组合嵌套,选择min sum_parts_size / (parts_num - 1),就是尽量选小而多的组合,比较碎片。整体上说应该是universal merge,n层几个merge出来生成n+1层
  4. merge算法是k-way merge+k堆,最常规做法。合并数据有两种做法,Horizontal 与 Vertical,Horizontal简单每次写一整行,Vertical按列写比较向量化,先写主键以及对应的part,然后拿着part信息再写其它列
  5. StorageMergeTree::scheduleDataProcessingJob->SimpleMergeSelector::select 选择需要merge的parts的具体实现
  6. MergingSortedAlgorithm::mergeImpl Merge的实现,就是循环的从待Merge的Part中(存放于queue对象中),找到最小的数据行,并将其放到输出结果集中queue的实现为SortingHeap。针对 Vertical 算法,需要额外输出 row_source,即输出数据行对应的原始 Part 编号
  7. ColumnGathererStream::gather VerticalMergeStage 阶段的主要任务就是为剩余的Gatherer Columns 包含的列进行输出

Group By原理

  1. MySQL分组查询Group By实现原理详解 mysql group by本质上说是一种排序,可以利用索引事先排好序,如果不能直接从索引得出group by就需要在临时表排序
  2. ClickHouse之聚合功能源码分析 源码分析 | ClickHouse和他的朋友们(15)Group By 为什么这么快
  3. InterpreterSelectQuery::executeImpl->executeAggregation 初始化各种配置项,构造AggregatingStep,并将其添加到query_plan中
  4. AggregatingStep::transformPipeline 对于每个上游的数据流,会构造一个AggregatingTransform的节点进行预聚合,最后pipeline resize了一下,避免了下游单线程跑可能性能差的问题(https://github.com/ClickHouse/ClickHouse/issues/35096)
  5. AggregatingTransform::work->consume 预聚合阶段,通过调用aggregator.executeOnBlock(...)函数执行预聚合
  6. AggregatingTransform::work->initGenerate 扩展pipeline,合并阶段,当预聚合阶段结束(上游通道关闭,或者聚合行数到达了设定的上限),通过扩展pipeline替换上游节点,然后等待合并数据。这里根据aggregator在预聚合过程中是否因为限制内存使用而将数据写到磁盘文件,扩展的节点是不同的
    1. 如果没有写到磁盘文件,扩展ConvertingAggregatedToChunksTransform节点
    2. 否则扩展SourceFromNativeStream,GroupingAggregatedTransform,MergingAggregatedBucketTransform,SortingAggregatedTransform
    3. GroupingAggregatedTransform 将single_level block转化为two_level block,并按照block_num进行组合,然后交给MergingAggregatedBucketTransform进行合并操作
    4. MergingAggregatedBucketTransform 进行合并操作,因为MergingAggregatedBucketTransform可以有多个,因此合并阶段也可以是并行的
    5. ConvertingAggregatedToChunksTransform 如果预聚合数据是two_level block,则扩展节点进行并行合并,然后在本节点进行sort;否则直接在本节点合并
  7. ExecutingGraph::updateNode->AggregatingTransform::expandPipeline AggregatingTransform构造了一个新的input_port,和扩展节点中的最下游节点的output_port连接起来,下次执行prepare()函数的时候,获取的input_port是新构造的那个,这里实际上等价于切换了上游数据流,切换完成。从预聚合阶段切换到合并阶段的,这部分内容也是典型的运行时扩展Pipeline的案例:需要根据计算时的数据动态的判断之后需要执行的节点类型和结构
  8. Aggregator::Aggregator 记录预聚合前的内存使用,作为是否将预聚合数据写入磁盘文件的依据,每个聚合函数有个对应的State对象,该对象作为预聚合过程中内部数据的存储点,一个sql语句中可以有多个聚合函数,ClickHouse中是将多个聚合函数的State对象分配在一整块内存上的,因此,这里需要计算每个State对象的大小和偏移量,根据键类型选择合适的哈希表。
  9. Aggregator::chooseAggregationMethod 这里根据“grouping key” 的数量、特点(lowCardinality、isNullable、isFixedString)等性质,选择合适的哈希表类型,默认选择serialized类型的哈希表,这个哈希表的键就是将多个“grouping key”拼接,针对个别类型的哈希表,构造cache
  10. Aggregator::executeOnBlock 执行预聚合的接口
    1. initDataVariantsWithSizeHint result(类型AggregatedDataVariants)是一个out型参数,实际的哈希表也是在这个对象中,这里会执行初始化操作,即根据aggregator选择的哈希表类型来初始化对应的哈希表
    2. materialize columns ClickHouse中有些列不能在聚合操作中直接使用,比如Const Column、Sparse Column等。这里对“grouping key”中这些列做了具化处理(即格式转换为普通格式
    3. prepareAggregateInstructions 这个函数内部是聚合函数的参数拼接的过程,聚合函数的参数,根据名字找到对应的列数据
    4. executeWithoutKeyImpl / executeImpl 执行具体的聚合操作
    5. convertToTwoLevel和writeToTemporaryFile 聚合操作之后,判断是否要将单层哈希表转换为双层,以及是否将数据写到磁盘文件中。
  11. executeImpl->Aggregator::executeImplBatch 遍历需要聚合的行,对每一行我们计算其哈希表中的键,如果这个键在哈希表中不存在,则通过aggregates_pool->alignedAlloc申请一个内存块,并在内存块上初始化每个聚合函数的State对象,遍历聚合函数,依次执行预聚合操作
    1. 如果聚合key是8 bit, addBatchLookupTable8 数据根据key放到4*256的空间里面,256就是uint8的空间,4可能是因为前面把数据处理成这样。然后调用add, merge这个标准流程处理数据
    2. aggregates_pool->alignedAlloc申请内存给createAggregateStates创建新的State对象,这里是可以用上jit的
    3. addBatch 使用聚合函数做聚合

函数调用向量化实现

  1. ClickHouse源码笔记3:函数调用的向量化实现
  2. SELECT a, abs(b) FROM test 对应的stream ExpressionBlockInputStream->ExpressionBlockInputStream(生成abs(b),然后删除列b替换成abs(b))->TinyLogBlockInputStream 这块的实现已经改了,看ExpressionTransform
  3. ExpressionActions::executeAction-> FunctionUnaryArithmetic.executeImpl-> UnaryOperationImpl::vector-> AbsImpl.apply 这就是一个完美的向量化优化代码,没有任何if, switch, break的分支跳转语句,for循环的长度也是已知的

select where的实现

  1. ClickHouse源码笔记4:FilterBlockInputStream, 探寻where,having的实现
  2. FilterTransform::FilterTransform 寻找filter column的位置,判断是不是常量列
  3. FilterTransform::doTransform 执行filter表达式,生成FilterDescription并通过其过滤其他的每个列
  4. ColumnVector<T>::filter 对于每个列,遍历bool数组,然后将合法数据塞进一个新的列之中,最终新的列替换旧的列,就完成了一列数据的过滤。TargetSpecific::AVX512VBMI2::doFilterAligned也可以打开avx512指令集优化

排序实现

  1. ClickHouse源码笔记6:探究列式存储系统的排序
  2. PartialSorting->MergeSorting 两个重要的pipeline
  3. Partialsortingtransform::Transform->sortBlock 对单个block排序,相当于处理一个batch
  4. getBlockSortPermutationImpl->ColumnVector<T>::getPermutation->ColumnVector<T>::permute 先通过单列排序,拿到每一列在排序之后的IColumn::Permutation perm。然后Block之中的每一列都利用这个perm, 生成一个新的排序列,替换旧的列之后,就完成Block的排序了
  5. ColumnVector<T>::getPermutation 如果有limit做std::partial_sort,如果数字类型考虑使用基数排序,否则使用默认的快排
  6. ColumnVector<T>::permute 简单的根据map(排序列和被排序列之间的位置索引)生成一个新的排序后的列
  7. MergeSortingTransform::consume 从source stream读取到memory block,如果超出限额就持久化到文件做外部排序,增加BufferingToFileTransform,MergingSortedTransform两个processor
  8. 排序算法因为数据依赖没做太多simd优化,有些技巧估计还没普及

分布DDL

  1. ClickHouse 分布式DDL 执行原理剖析
  2. InterpreterAlterQuery::executeToTable->executeDDLQueryOnCluster 判断是否是分布式ddl
  3. DDLWorker::enqueueQuery 往zk的distributed_ddl目录提交queue-000xxx记录,创建ddl task和子目录(/distributrd_ddl/queue-000xxx, /distributed_ddl/queue-000xxx/active, /distributed_ddl/queue-000xxx/finished)
  4. getDistributedDDLStatus 阻塞等待各个节点工作完成,扫描zk目录(/distributed_ddl/queue-000xxx/finished)
  5. DDLWorker::runMainThread->scheduleTasks->processTask->tryExecuteQueryOnLeaderReplica 需要leader角色的抢leader锁
  6. replicated_storage->getStatus ->tryExecuteQuery 获得副本锁,提交非分布式但可能多replica的ddl任务-副本间任务({zk_path}/log/log-00000xxx {zk_path}/mutations/0000xxx)
  7. waitForLogEntryToBeProcessedIfNecessary 等待副本间任务都完成
  8. StorageReplicatedMergeTree::queueUpdatingTask->pullLogsToQueue 同步/{zk_path}/log/log-00000xxx到自己的副本目录/{zk_path}/replicas/{replica}/queue/queue-0000yyy
  9. BackgroundJobsAssignee::threadFunc->scheduleDataProcessingJob->executeLogEntry->MergeTreeBackgroundExecutor<Queue>::threadFunction()->task->executeStep() 每个副本拉取任务执行
  10. tryFinalizeMutations 更新表示完成的zk状态

join原理

本地join

  1. Nested Loop最基础的join方式,双重循环性能差
  2. Block Nested-Loop Join 使用join buffer,每次外层循环读取多条数据放进buffer,然后在内层一次执行完这多条外层数据的比较
  3. Sort Merge Join 外层,内层的数据先根据join key排序,然后有序merge,优点是使用内存可控
  4. hash join,小表放进hash,然后大表逐条记录hash lookup,如果小表无法放进内存就需要dump到文件,快但耗内存。

分布式join

  1. shuffle join 根据join key做hash把数据拆到不同的计算节点,这样每个节点都是local join,然后合并不同节点的结果
  2. broadcast join 小表广播,把小表复制到所有大表数据节点形成local join
  3. co-located join 数据已经根绝join key分好片了,天然的local join

Clickhouse的join

  1. clickhouse的本地join先尝试hash join,内存超出就merge join
  2. ClickHouse Join为什么被大家诟病? 
  3. 分布式join是两阶段执行的模型,协调者转发请求到多个worker节点,收集worker节点的计算结果做汇总计算
  4. 普通分布式Join,协调者分发左表到n个worker节点,worker节点去其他n个节点去拿右表的全部数据,形成n*n数据传输读取,ck未能真正实现shuffle
  5. global join,为了避免每个worker重复拉取一遍右表数据,改成由协调者拉取一遍右表数据然后传输到worker节点,能避免右表的重复计算,但是有大量的重复传输
  6. Colocate JOIN,可以在生成表的时候就按join key方式分片,然后用一个分布式表join一个local表,实现的效果就是colocate join 
  7. 两阶段模型,第二阶段如果计算密集比如count distinct协调者会成为瓶颈
  8. 不支持Shuffle,如果右表是大表,没法做多节点并行优化
  9. 有些场景过滤条件下推做的不好需要人工改sql优化
  10. 没有runtime filter,runtime filter在右表很小的时候可以作为filter提前应用到左表避免很多data load和hash probe

hash join流程

  1. ClickHouse之本地Join(hash join部分)源码分析(doing)
  2. FillingRightJoinSideTransform.work->HashJoin::addJoinedBlock->joinDispatch->insertFromBlockImpl->insertFromBlockImplTypeCase 使用右表构建hashtable,数据结构看RightTableData.maps
  3. JoiningTransform::transform->HashJoin::joinBlock crossjoin直接笛卡尔积,right或者full join物化一下稀疏或者常量列,然后转到joinBlockImpl
  4. joinBlockImpl switchJoinRightColumns返回一个row_filter,这个row_filter记录了左表block中哪些行需要保留,joinRightColumns查map看是否保留,block.insert(added_columns.moveColumn(i)) 将added_columns拼接到左表的block中,block.safeGetByPosition(i).column->filter(row_filter, -1) 这里执行真正的过滤操作

两阶段分布式执行

查询

  1. ClickHouse 分布式DDL 执行原理剖析
  2. StorageDistributed::read sql重写主要是替换库名表名,ClusterProxy::executeQuery主要是修改和设置一些配置 SelectStreamFactory::createForShard 决定选择本地副本还是远程副本
  3. pool->getManyChecked 这里会去连接池中获取远端的entry,entry中包含着请求其他server的必要信息 ConnectionPoolWithFailover::getManyImpl->getMany 真正获取entry并进行排序的过程
  4. 简单说就是遍历所有shard,然后选择合适的方式连接每个shard,生成对应的stream
  5. 生成stream的逻辑在emplace_local_stream,emplace_remote_stream等等几个匿名函数,实际读取数据的逻辑也在里面
  6. ReadFromRemote::initializePipeline会生成lazily_create_stream,里面会用到ConnectionPoolWithFailover::getManyImpl

写入

  1. 同步写入是指数据直写入实际的表中,而异步写入是指数据首先被写入本地文件系统,然后发送到远端节点。异步写是主流
  2. 主要逻辑就是根据sharding_key进行拆分,然后写到不同的位置,有目录加锁逻辑

一些优化点

groupBitmap替换uniqExact

  1. 为什么groupBitmap比uniqExact快?
  2. 该优化在select countDistinct的时候使用,高基维效果明显
  3. 关键区别在merge操作,只有分布式场景需要merge。分布式场景先在数据节点执行部分聚合,然后都传到计算节点做合并然后排序。决定是否做分布式聚合的代码在 Aggregator::mergeOnBlock

基于规则的优化

  1. ClickHouse SQL 的十项优化规则 多数优化可以在前面的ast重写的prewhere部分找到

一些其他系统的sql优化

  1. SQL 子查询的优化 关联子查询join下推,算子上提
  2. 子查询相关的优化 tidb的一些子查询优化,相对简单
  3. Max/Min消除,在索引列上的时候转换成order by limit 1就只用取一行
  4. 谓词下推,下推到理数据源尽量近的地方,很好理解
  5. Join Reorder 算法简介 Join Reorder,多表join,根据表的数据量分布选择合适的join顺序

jit原理

  1. JIT in ClickHouse
  2. USE_EMBEDDED_COMPILER 启动jit编译 ExpressionActions::ExpressionActions->ActionsDAG::compileExpressions->ActionsDAG::compileFunctions->compileFunction 这里实际生成llvm的ir,在子类的compileImpl里面补全更多细节(搜关键字llvm::IRBuilderBase) 一个很容易理解的例子是AggregateFunctionCount
  3. llvm生成ir的用法可以参考链接里面的例子
  4. SELECT a + b * c + 5 -> plus(plus(a, multiply(b, c)), 5) 会编译压平成一个function,尽量simd
  5. aggregate functions 有很多虚函数,有不少wrapper,也是通过jit压平。多个列上的聚合函数也可以合并成一个大聚合函数。
  6. 对于排序,如果要比较多个列就是多个虚函数调用,这里也可以jit压平成一个函数提升性能
  7. CHJIT::compileModule->JITCompiler.compile->llvm::legacy::PassManager 代码生成,结果存储在llvm::MemoryBuffer
  8. llvm::RuntimeDyld->loadObject->resolveRelocations 动态链接器符号解析,重定位
  9. compiled_module.function_name_to_symbol.emplace 维护<function name>到生成的符号的映射
  10. module_identifier_to_memory_manager[current_module_key] 维护函数(Module)到MemoryManager的映射

bitmap的一些细节

  1. createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData> 创建bitmap
  2. SmallSet roaring::Roaring64Map roaring::Roaring 三种实现
  3. roaring_array_s 32位核心数据结构,维护container(数据桶), key(桶编号), typecodes(桶类型)等数据结构
  4. roaring_bitmap_and->container_and->bitset_bitset_container_intersection->bitset_container_and_justcard->_mm256_and_si256 and流程
  5. Roaring64Map 会重用32位的Roaring结构,RoaringBitmap64是由一系列RoaringBitmap32表示。实现方式有很多种,一种比较通用的做法用map存储,是把前32位存成key,value是后32所对应的RoaringBitmap32。前32位key的做法和32位roaring的高16位做法类似

action的一些细节

  1. action类型等等描述(ActionDAG.h),描述计算结构的一种通用结构,最有用的应该是function类型的actionType
  2. action创建是ConstInDepthNodeVisitor<ActionsMatcher, true> visit产生
  3. 通过ExpressionActions挂着的,貌似会挂到很多地方
  4. 实际执行在ExpressionActions::execute
  5. 执行点案例MergeTreeRangeReader::MergeTreeRangeReader

数据存储格式




2023年10月3日星期二

SIMD相关

原理

  • Add ALUs to increase compute capability
  • 应该就是增加计算单元提升算力,缺点可能是要占用芯片面积就没法加其他东西了,毕竟使用场景有限,所以avx512会因为体积和能耗被某些人鄙视。

字符查找

  •  Looking for an index of an element in array via SIMD. A fast way
  • 16个8位字符打包成128位__m128i ARR = _mm_setr_epi8(1,2,3...)
  • 设置被比较数字的vector__m128i N = _mm_set1_epi8(3)
  • 比较并得到按bit的mask,__m128i cmp = _mm_cmpeq_epi8(ARR, N); int mask = _mm_movemask_epi8(cmp);
  • _tzcnt_u32返回尾部最小有效零位的个数,也就是第一个满足条件的字符的位置

基数排序

  • Bitwise MSB Radix Sort on AVX-512
  • 基于bit的MSB基数排序,从高位开始每bit做比较像快排一样做左右划分,这样不需要额外的空间去做基数归类
  • simd排序有个麻烦的点是怎样搬动比较key之外的数据,这里的做法是看成kv并打包在一起,如果v是64位指针kv就是128位,512位的vector可以放4个,如果v是32位的位置kv是64位放8个
  • 每次拿vector中所有key中的一位进行比较得到一个mask, testAndCount(bitMaskVec, keyPayload, sortBits, popcnt)
  • 然后关键点是mask_compressstoreu这个指令,通过sortBits,把keyPayload符合条件的位对应的数据写到d + writePos[0],这样就实现了搬动数据
  • 但是感觉一位一位排序似乎效率有点低,把simd的优势又还回去了?

排序网络(Bitonic Sorting Network)

  • Bitonic_sorter 
  • 双调排序,适合并行度比较大的场景,双调序列是先单调递增然后单调递减的序列,将双调序列等分然后两个子序列一一比较得到min和max序列又都是双调序列。排序算法是先从原子一级级组成一整个双调序列,然后再一级级拆散最后就是排序好的
  • 复杂度是n*logn^2,但并行度很好
  • simd算法详见A Novel Hybrid Quicksort Algorithm Vectorized using AVX-512 on Intel Skylake

快速排序

  • Fast Quicksort Implementation Using AVX Instructions
  • 假设一次操作4个数,VBROADCASTSS (pivot), P (设置pivot)
  • VMOVDQU (in), D (取数组成向量)
  • VPCMPGTD D, P, C (向量和pivot比较)
  • VMOVMSKPS C, r (生成按bit的mask)
  • SHL $4, r VPERMILPS permTableLesser(, r), D, L VPERMILPS permTableGreater(, r), D, G (用mask查表得到两个向量内位置list,然后用vpermilps指令从源向量取数到目标寄存器,完成了partition这一步)
  • permTableLesser这种表是预先构建好的,因为mask寄存器只有16种可能对应的位置都是确定的
  • POPCNT r, r0 MOV $4, r1 SUB r0, r1 (计算partition后两边各有几个数)
  • VMOVDQU L, (Lptr) VMOVDQU G, (Gptr) (寄存器写回内存比如数组里面)
  • LEA (Lptr, r0, 4), Lptr LEA (Gptr, r1, 4), Gptr (移动数组指针)
  • 256位的AVX2增加了VPGATHERQQ指令,这样源向量里面可以存kv然后VPGATHERQQ只取出key来比较
  • AVX512增加了VCOMPRESSPS指令,直接通过mask写无需查表
  • 存kv始终是个痛点,放一起会减少向量里面的记录数,A Novel Hybrid Quicksort Algorithm Vectorized using AVX-512 on Intel Skylake 给的一个解决办法是value单独放一个等长的数组,根据mask partition的时候也给value数组操作一次

2023年9月14日星期四

Rocksdb源码笔记

重要流程

DBImpl::Get

  1. GetAndRefSuperVersion首先要获取super version,sv记录column family全局状态,有哪些memtable,有哪些sst等等。所以每次flush或者compaction之后都会更新sv
  2. 更新sv和读取sv可能发生冲突,所以用thread local机制来尽量避免锁
  3. 从super version读,读的顺序是mem(memtable), imm(immutable memtables), current(version sstables)
  4. MemTable::Get 先通过bloomfilter过滤 bloom_filter_->MayContain 然后MemTableRep::Get,如果是skiplist实现就是在skiplist里面找,还有hashskiplist,skiplist是比较均衡的选择
  5. MemTableListVersion::Get 顺着immutable memtable list查找。因为memtable是新写的数据所以总数据占比可能不高,命中率也可能不高
  6. Version::Get 遍历所有的sst files查找数据,遍历的过程l0顺序其它二分加上file indexer。处理每个sst文件是通过TableCache::Get,里面整合了各种读cache和读文件的过程
    1. 如果配置了row cache GetFromRowCache直接拿到kv值。cache key会用上sst file number做前缀,所以不会有update只有get失败时用insert填充
    2. BlockBasedTable::Get 封装了从block cache或者sst file读取的逻辑,这个talbe对应sst file number
    3. FullFilterKeyMayMatch 通过bloom filter快速过滤该sst file
    4. IndexIterator.seek(key) 在索引里寻找key对应的block(包含地址,大小等信息)
    5. BlockBasedTable::NewDataBlockIterator-> BlockBasedTable::MaybeReadBlockAndLoadToCache 从blockcache里面读取或者读sst file并放回block cache
    6. DataBlockIter.SeekForGet 读取block的实际内容
    7. DataBlockIter::SeekForGetImpl 通过hashmap寻找key对应的binary seek index位置,然后再通过这个位置读取block里面的data
    8. get_context->SaveValue 直接将Block中的数据地址赋给用户传进来的PinnableSlice*中(知名优化点)

DBImpl::WriteImpl

  1. WriteImpl 入口,write_thread_.JoinBatchGroup加入当前batch的线程组,抢主(cas队列头),如果失败需要等待到下一个可运行状态(几种情况比较细节)
  2. WriteBatchInternal::InsertInto 如果出来的状态是并发写入memtable,则自己不是leader且leader已经将数据写入wal,可以将自己的数据写入memtable
  3. write_thread_.ExitAsBatchGroupFollower 判断自己是否是最后一个写入memtable的线程,如果是则需要做最后的处理设置last seq、选出下一个正在等待的线程中的leader继续做group commit逻辑
  4. PreprocessWrite 切换wal,memtable等等写前预处理,从这里开始都是leader的行为
  5. write_thread_.EnterAsBatchGroupLeader 初始化batch group leader信息,处理max_size相关逻辑,CreateMissingNewerLinks(newest_writer)将当前writer链表组成双向链表语义上得到了一个batch
  6. bool parallel 判断是否能并发写memtable,重要条件是所有batch里面没有merge操作
  7. WriteToWAL 写wal
  8. WriteBatchInternal::InsertInto 写memtable,根据是否能并发选择写入方式,两个分支在调用参数上稍有区别,并发就只写当前batch,非并发就leader写所以batch。顺序写分支要先write_thread_.LaunchParallelMemTableWriters(&write_group) 唤醒所有其它writer
  9. should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w) 判断自己是否最后一个写memtable的线程,如果是就调用write_thread_.ExitAsBatchGroupLeader设置下一轮的leader

Compaction

  1. 需要关注的几件事,是否要compact,选择哪些文件compact以及怎样compact
  2. DBImpl::MaybeScheduleFlushOrCompaction 后台自动compaction的入口
  3. LevelCompactionPicker::NeedsCompaction 是否需要compaction,会逐层判断分数是否>1,分数计算在VersionStorageInfo::ComputeCompactionScore
    1. 对于L0,文件个数/level0_file_num_compaction_trigger得到一个分数,total_size/level_max_bytes_[base_level_]得到另一个分数取最大值
    2. 其它level,level_bytes_no_compacting/MaxBytesForLevel(level) 计算分数
    3. MaxBytesForLevel可以来自配置也可能动态算的,动态算法是先到数据量最大一层,然后按乘数因子递减
  4. DBImpl::BackgroundCompaction->LevelCompactionBuilder::PickCompaction 选择需要compact的文件
    1. SetupInitialFiles->PickFileToCompact 先选择startlevel和targetlevel,之前已经根据分数倒排序所以直接拿最上面那层做start,targe直接+1,PickFileToCompact 选择要compaction的文件,首先startlevel根据文件大小倒排然后选最大的,会用ExpandInputsToCleanCut把range overlap的都选进来,但在level compaction似乎没什么用,然后选择合适range空间的output files,GetOverlappingInputs,同样也会ExpandInputsToCleanCut也没啥用。选好后会跳过正在compact的文件。
    2. GetOverlappingL0Files 如果start level是L0逻辑会有区别,需要比较所有文件
    3. SetupOtherInputsIfNeeded->SetupOtherInputs 如果是L0,start在上面一步变化了,output也要相应变化
    4. GetCompaction最终返回一个Compaction对象
  5. CompactionJob::Prepare->GenSubcompactionBoundaries 需要并发的compact会被抽象成sub compactions,这里会解析生成sub compaction以及对应的边界
    1. 原理https://github.com/facebook/rocksdb/wiki/Subcompaction
    2. Compaction::ShouldFormSubcompactions满足条件才做sub compaction,leveled情况下L0的compaction或者kRoundRobin选文件或者手动状况下满足条件
    3. 遍历所有level所有file,生成对应的anchor,一个文件128archor,代表一个范围。然后排序去重
    4. 然后计算sub compaction并把所有的archor平均分,并保存在boundaries_里面
    5. 然后Prepare里面会生成sub compaction的列表sub_compact_states
  6. CompactionJob::Run 遍历sub compaction都放到线程池里面启动多线程compaction其中当前线程会分担sub_compact_states[0],执行的函数是ProcessKeyValueCompaction。执行完之后应该直接生成sst file没有合并这一步
    1. 实际上每次BackgroundCompaction一般是从start选一个文件output选多个文件,然后多次BackgroundCompaction形成并行关系,满足sub compaction条件之后可以进一步文件内部并行
  7. ProcessKeyValueCompaction 取出subcompaction的kv放到一个迭代器里面(此时会构造堆结构)
    1. VersionSet::MakeInputIterator 对L0每个文件建立一个TableCache::NewIterator对其它层整层建立LevelIterator
    2. NewCompactionMergingIterator 建立堆排序iterator
    3. CompactionIterator->c_iter->SeekToFirst 构建需要使用的iterator
    4. while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) 开始迭代输出
  8. SubcompactionState::AddToOutput 输出到文件
    1. open_file_func 没有builder的情况下新建table builder
    2. BlockBasedTableBuilder::Add 添加数据,flush的时候会创建index block
    3. CompactionJob::FinishCompactionOutputFile->BlockBasedTableBuilder::Finish 写各种index filter block完成文件

DeleteRange

  1. 参考https://rocksdb.org/blog/2018/11/21/delete-range.html
  2. 如果没有delete range,要用delete来做需要seek start, iterate and compare很慢。而且做scan的时候如果tombstone很多要一个个过没法快速跳过会很慢
  3. 基本的解决思路是写入的时候写入range tombstone,在memtable里面是range tombstone,在sst file里面有一个range deletetion block,读的时候通过在range合并之后的天际线里面二分查找快速判断是否删除,如果删除可直接返回
  4. range deletion block存储格式见https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format 末尾
  5. compaction或者flush的时候会清理过时的tombstone, tombstone到达sst最底层时可清除因为它就像个罩子来判断它下面层次的数据是否被删除
  6. DB::DeleteRange->DeleteRangeCF->DeleteImpl->MemTable::Add 会选择range_del_table_插入数据
  7. DeleteImpl->CheckMemtableFull 如果memtable满就flush这个带着tombstone的table
  8. ProcessKeyValueCompaction compaction过程迭代inputs
    1. CompactionRangeDelAggregator::AddTombstones 收集input里面的tombstones
    2. 如果是merge操作 MergeHelper::MergeUntil->ForwardRangeDelIterator::ShouldDelete 合并的时候判断key能否从range tombstone里面删除。如果当前key没有快照引用,或者版本比tombstone里面高,tombstone里面都可以删除
    3. 如果是put操作可以直接删除
    4. 没删掉的tombstone通过builder->Finish写入文件

DBImpl::IngestExternalFiles

  1. 参考https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files
  2. ReserveFileNumbersBeforeIngestion 获取下一个可用文件号
  3. ExternalSstFileIngestionJob::Prepare 将待导入文件拷贝/移动到db内部的sst文件中,导入前会判断如果多个文件是否有范围重叠
  4. WriteThread::EnterUnbatched 停写
  5. ExternalSstFileIngestionJob::NeedsFlush 判断是否要flush memtable,逻辑在ColumnFamilyData::RangesOverlapWithMemtables应该是简单的范围比较而不是遍历每个key。然后flush,DBImpl::FlushMemTable
  6. ExternalSstFileIngestionJob::Run 实际ingest逻辑
    1. CheckLevelForIngestedBehindFile 如果有ingest_behind标志,直接尝试放最后一层,如果有key重叠返回失败
    2. AssignLevelAndSeqnoForIngestedFile 从L0往下看每层首先CompactionPicker::RangeOverlapWithCompaction是否和本层正在进行的compaction output的范围冲突,Version::OverlapWithLevelIterator,IngestedFileFitInLevel(两个类似)再检查每个sst file是否有冲突
    3. AssignGlobalSeqnoForIngestedFile 为ingest_file获取global seq no
    4. edit_.AddFile 更新文件元信息到VersionEdit
  7. write_thread_.ExitUnbatched 恢复写入

Version相关

  1. 数据库lsn记录在last_sequence_,在DBImpl::WriteImpl每次写都会增加
  2. DBImpl::GetImpl读取的时候会使用seq构造LookupKey去读,seq来源GetLastPublishedSequence,LookupKey的构造类似于user_key + sequence + type
  3. 这个lookupkey有三种使用方法memtable_key(全部内容),internal_key(去掉长度描述),user_key(再去掉sequence和type只留下外部传入的key)
  4. 在memtable里面iterate的时候根据internal_key来寻找第user_key相同internal_key大于等于的key
  5. TableCache::GetFromRowCache row cache在查找的时候会用row_cache_key去查找,row_cache_key的构成是fd_number+seq_no(正常情况可能是0)+user_key。这就清楚了为什么不用update row cache了,其实还是和sst file关联的

性能优化

FileIndexer

  1. 大体思路是上一层确定了key在某个文件的范围,但是又不在这个文件中的时候可以利用这个信息加快下一层的查找。
  2. 可以做一个文件相对位置的索引,上一层的每个文件的start和end对应下一层的文件和位置

减少一次Get过程中的内存拷贝

  1. 问题是之前的版本数据从sst读取之后需要一次copy给返回给用户的变量,那明显的解法是直接返回sst读取之后内存块里面对应的地址
  2. 带来的问题是DataBlockIter本来是填充block cache之后就释放,如果返回地址给读接口,可能读接口拿到值引用时空指针
  3. 解决方案是引入了Cleanable接口,然后DataBlockIter的clean委托给PinnableSlice(Get使用的结构),等Get使用完了以后一起释放

降低Statistics代价

  1. 使用CoreLocalArray数据结构保证数据不会跨cpu访问

写wal优化

  1. 多线程写wal的时候会有竞争,所以比较好的方式是一个线程收集所有线程的写请求batch写,写完之后通知其他线程继续,可以恢复成多线程写各自的memtable
  2. 这里的线程协调有个很微妙的性能点,leveldb使用在MutexLock里面pthread_cond_wait的方式来让其他线程等待,这样会导致context switch比较重。注意虽然pthread_mutex_lock使用futex有spinlock快速返回的逻辑,但是pthread_cond_wait没有。所以这里需要自定义比较精细的wait逻辑disruptor里面其实也需要
  3. 代码实现在WriteThread::AwaitState,首先busy loop并使用asm volatile("pause")避免cpu流水线重排,如果条件不满足再进入short wait,使用yield,还不行就进入long wait,使用cond.wait()

二分查找cache miss问题

kDataBlockBinaryAndHash

  1. 参考https://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
  2. 简单说就是加个hashmap索引key信息的相对位置

阿里x-engine的解法

  1. 参考https://zhuanlan.zhihu.com/p/114681578
  2. 如果用有序数组存key,value-address,在二分查找过程中要反复跳地址,只要跳动的长度大于cache line的长度就无法使用上次缓存就会cache miss
  3. 解决办法是用两层的b+ tree,b+ tree内节点只存放key所以很紧凑,寻址过程可以充分利用缓存。两层是实践选择

InlineSkipList

  1. 参考RocksDB 源码分析 – InlineSkipList
  2. 节点的指针和key存在一起,上面n层节点指针的位置分别在数组对应的-n,-(n-1),...-1的位置,因为skiplist常用操作是从上层节点一个一个往下找,所以变成了顺序而且局部的内存访问
  3. Splice主要是记录上次insert时候每层的range,如果是顺序插入就可以利用上次的range迅速缩小范围
  4. 并发比较普通,每一层独立做CAS

并发相关

ThreadLocalPtr

  1. 参考https://zhuanlan.zhihu.com/p/398409455
  2. 每个线程一个ThreadData,里面有个vector存储多个thread local数据对象。ThreadData组成链表方便一起处理
  3. ThreadLocalPtr::StaticMeta是个singleton,所有的ThreadLocalPtr都指向它

LRU Cache

  1. LRUCache最外层实体 LRUCacheShard缓存分片 LRUHandle基本存储元素封装k,v
  2. LRUCacheShard::Lookup 读链路,加锁(本分片锁),ref+1设置hit,没有传统lru的移动位置操作。lookup之后要调用Release
  3. LRUCacheShard::Insert 写链路,参数可带优先级,加分片锁,先看是否需要释放如果需要就先从尾部删除,然后插入table,然后从顺着优先级往下看哪里有空间就插到对应的head上。
  4. LRUCacheShard::LRU_Remove 删除,变化指针以及三个优先级的容量
  5. LRUCacheShard::Release get使用结束以后需要调用release接口,引用计数-1,然后判断引用计数是否=0,=0且容量不足就删除这个节点,=0有容量会调用LRU_Insert插回去。>0就什么都不做
  6. 所以lru的逻辑就是lookup过的item(hit),在引用变0的时候如果容量够有一次机会从高优先级区开始重新插入,而优先级的逻辑就是低优先级插到队列中间偏后的位置,淘汰的比较快一些
  7. 总结来看是一个比较标准的带分片和优先级的lru实现

Clock Cache

  1. LRU Cache的问题是每次读会导致重新插入,锁竞争激烈
  2. 大致逻辑是,分片,每个分片一个环形队列,释放空间时循环扫描如果entry上次扫描之后又被置了访问标记就清理标记继续,否则就干掉entry。这应该是rocksdb某个版本的实现
  3. 这个简单算法的问题是如果整体访问很多都被置了标志就退化成fifo了,解决方案是Two-Handed Clock,有两个指针,一个负责周期性扫描清理标记,另一个负责看被清理的标记是否又被置位如果置位说明访问频繁,否则可以干掉,扫描速度也可以自调整
  4. Rocksdb的HyperClockCache基本思路也是这样,细节还需要研究

MultiGet为啥快

  1. https://github.com/facebook/rocksdb/wiki/MultiGet-Performance
  2. 少了很多虚函数调用
  3. block index和filter都放在lru里面会有锁竞争,批量访问同一个sst file的key就免了这些竞争,不过clock的方式就没有这问题?
  4. 每次访问sst会访问bloom filter会带来cache miss,batch操作可以用pipeline的方式隐藏cache miss latency,这个技巧比较高端
  5. 单次请求多个访问同一个sst file可以通过io uring的接口io并行访问,多个请求就比较难这样做

存储格式

  1.  sst文件格式 https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format
  2. key的存储做了delta压缩,每隔k个key会有一个kv不做delta压缩被称作restart point这个被用来辅助binary search
  3. kv实际存下来shared_bytes:unshared_bytes:value_length:key_delta:value

    和hbase性能比较

    1. hbase get需要寻址region,当然结果可以被缓存
    2. leveled compaction对比类似universal compaction,以及rocksdb的file indexer
    3. hbase hfile是多层index block+data block,rocksdb是block meta block + block(index part+data part) meta block一般常驻内存
    4. block mem 内查找过程 hbase看起来是线性查找,rocksdb是binary+hash辅助
    5. block cache hbase一般存索引节点,rocksdb除了block meta还可以存 data block
    6. rocksdb有row cache
    7. flush 简单ab table vs 多列族导致复杂slab分配
    8. blockcache clockwise vs dumb lru
    9. memtable 超级优化vs ConcurrentNavigableMap
    10. 参考https://bbs.huaweicloud.com/blogs/192853

    问题

    block cache什么时候更新的?

    因为是block级别的所以应该不会被更新,只会随着compaction之类的动作被重置

    row cache什么时候更新的?

    不用更新,查询的时候sst file no会在条件里面

    redis足够快了吗?

    本质说就是一个单线程hashmap get操作,expireIfNeeded->dictHashKey->dictGetVal核心链路就是这样,除了可以干掉expire,让hash算法更简单一点外,似乎没有很大程度优化空间

    speedb

    性能点

    1. 写放大系数降低80%,重写了lsm tree https://www.linkedin.com/pulse/speedb-dramatically-improves-rocksdb-response-times-performance/,这一条似乎缺乏其他资料印证
    2. 读稳定性和p99提升,p99稳定大约是rocksdb的1/4,底层算法提升,compaction动态自适应调整,根据workload调整flush和compaction
    3. speedb认为rocksb读延迟高不是因为lsm tree,而是compaction相关缺乏资源管理

    优化细节

    1. snapshot-optimization https://docs.speedb.io/speedb-features/snapshot-optimization 缓存snapshot,减少获取snapshot锁冲突
    2. Write Flow optimization https://docs.speedb.io/speedb-features/write-flow
      1. rocksdb写wal是append只能单线程,memtable switch会block,写流程中还有其他不少需要全局锁的地方
      2. spdb改变了写流程,独立一个线程处理memtable switch,flush request到memtable,wal switch等等,这个线程用两个container(memtable?不确定)切换。
      3. 多个write batch组成一个batch group,以group级别刷wal,可以多个group并发刷wal
      4. 主要的优化点是db锁变成读写锁,写wal和memtable变成并行,写wal是计算文件位置再写,可以并行。memtable和wal切换都不会有db锁不会block整个流程
      5. 新流程带来的一个复杂度代价是写memtalbe失败需要rollback wal写
      6. 该优化带来近两倍的写吞吐,但个人理解并不能降低cpu使用率
    3. Global Delayed write https://docs.speedb.io/speedb-features/global-delayed-write, 把WriteController从per db变成全局,做全局的流量分配。这个知道就好
    4. Proactive Flushing https://docs.speedb.io/speedb-features/proactive-flushing flush行为不再是被动发生在write调用的时候,判断依据不再是年龄而不顾数据多少
    5. Sorted Hash Memtable https://docs.speedb.io/speedb-features/sorted-hash-memtable 要并发写,fast read,又要能scan,默认数据结构中只有skiplist,shorted hash map的结构是一个concurrent hashmap+一个list,这个list分成无序和多个有序的vector部分,读很简单hash o(1),写同时写hashmap和无序list,后台线程整理无序list成有序vectors,整理发生在无序size超过10000或者发生scan的时候。不得不说这种根据场景打破常规做法的数据结构比较牛逼。写提升155%,随机读提升50%
    6. Paired Bloom Filter https://docs.speedb.io/speedb-features/paired-bloom-filter
      1. 原始的bloom filter局部性很差,对于一个positive key查询会有k(函数数量)次cache miss
      2. blocked bloom filter是个改进,会把内存划分成blocks,每个block大小等于cache line size(512b),对于每个key先拿一个函数映射到一个block,再拿k个函数在block内做原始bloom filter。这个是rocksdb的默认实现方式
      3. 但问题是随着key的位数增长,fpr增加得比较多,核心原因是每个block的key的数量差异很大
      4. 解决方式是通过pair来减少key的数量差异,先做block映射,然后把block分片成很多batch,一个batch 128 block,然后通过batch内通过block key的数量排序block首尾匹配成64个pair,key的前k/2个函数映射到一个block,后k/2映射到paired block,block在batch中的位置需要额外几位来存
      5. Ribbon filter比较省内存,但是耗4-6倍cpu,所以看场景
      6. 随机读比默认算法吞吐翻倍
    7. Range Delete Improvement https://docs.speedb.io/enhancements/remove-single-delete-elements-during-memtable-flush 原来的实现delete range的key还是会flush到sst,这个变更在flush过程中会过滤,一个比较细节的提升
    8. Dynamic Delayed Writes https://docs.speedb.io/enhancements/dynamic-delayed-writes 更精细的delayed write rate计算方式让变化更线性
    9. Reduce switch memtable latency https://docs.speedb.io/enhancements/reduce-switch-memtable-latency 提前准备用于switch的memtable,在切换的时候更平滑,但是要付出额外内存的代价


    参考链接


    2021年10月11日星期一

    IO性能相关

     SSD性能

    1. 固态硬盘有逻辑地址和物理地址的关联系统,所以不管顺序或者随机写都没有寻址时间,都是顺序写入nand。反复更新热点数据nand也会被均匀消耗。
    2. 没有读改写的过程,写入都是先将数据写入新的物理地址,再将原来的地址标为无效。
    3. ssd已经写入过数据的块需要擦除才能再次写入,类似gc的过程。随机写的数据擦除难度要大于顺序写。
    4. 队列深度,端口队列中等待服务的I/O请求数量,有点类似于网络服务中的缓存队列。队列深度和多线程还是有区别,单线程的时候,即使队列深度大于1,但每个访问请求的寻址和传输都是串行的,也就是必须先寻址然后才能传输。而多线程的时候,不同线程的这两步是可以并行的。
    5. ssd的顺序访问的定义,如果 I/O 操作开始的逻辑块地址(LBA)直接跟着前一个 I/O 操作的最后LBA,则称值为顺序访问
    6. 即使ssd随机访问也比顺序访问慢。原因是随机读不能利用预读功能提前缓存数据,每次 IO 需要重新定位物理地址,随机写造成大量磁盘碎片,极大地增加了垃圾回收的压力,小数据量的随机写无法利用 SSD 内置的并发能力
    7. ssd多线程随机读性能很好,某些时候甚至接近顺序读,因为ssd芯片本来就支持并发处理,顺序读的提前预测地址的能力相当于被多线程同时传入多个地址实现了。多线程随机写在磁盘充足的情况下也是同理,但是一旦gc性能就下来了,所以控制磁盘使用量也是一种优化手段。


    参考链接 

    写放大

    不能原地更新只能先擦除再写入,写入是页为单位擦除只能块(很多页)为单位,如果一个块上面部分是有效数据部分要擦除只能把有效数据挪到其它块然后整块擦除,写放大就出来了。擦除过程类似GC,空余空间越多越容易腾挪,写放大就相对较小,所以不要把SSD写太满。

    高延迟网络优化

    TCP BBR

    1. 基于tcp的拥塞控制算法,在4.9以上的linux直接通过参数配置就可以使用
    2. 传统拥塞算法会不断增加窗口直到把带宽占满buffer占满直到丢包,丢包之后又会迅速降低发送速率。导致两个问题,高延迟网络往往丢包率高-不仅仅是buffer占满的丢包所以经常降低发送窗口导致低带宽利用。需要占满带宽和buffer来探测带宽导致高延迟。
    3. bbr解决方案关键点在于不用丢包判断带宽而是直接通过确认包来判断,慢启动发现带宽不再增加后不会进入占满buffer阶段。
    4. 网络链路上能容纳的数据包数量 = 链路带宽 × 往返延迟,所以还需要探测延迟,而buffer较满的时候延迟会有偏差,所以bbr会定时有低频率发包估算延迟的阶段。
    参考链接

    KCP

    1. 通常基于udp,实现了tcp的可靠传输,顺序,流控等等
    2. 选择性重传而不是tcp的全部重传,快速重传而不是等超时,立即ack而不是延时ack,非退让流控可以直接占满带宽,这几个是快的原因。如果丢包率高,重传应该对整体速度影响较大,快重传很有意义。
    3. 缺点是两端要安装额外的客户端,有额外的10%-20%的带宽浪费,另外主要应用好像在游戏,视频领域
    参考链接

    SDPK&IO_URING

    1. 两者加速原理类似,在高速存储设备上(nvme ssd)用轮询代替中断避免大量中断消耗,避免内核态用户态切换,避免系统调用消耗,同时提高响应稳定性降低延迟
    2. SDPK仅通过单核就能带动很多块ssd,单核能力上优于uring,uring使用内核线程轮询方式下可以接近SPDK的性能
    3. 普通aio iops只能达到SDPK的一半左右,latency在150k以上会提高很多libaio,SPDK,和io_uring的性能比较

    INODE知识摘要

    1. 面对硬件层面用块(4k)的方式管理,要解决的是文件和块的关系问题
    2. 顺序存有碎片问题不可取,链式存原地链怕磁盘损坏断链,集中存就是文件分配表(fat),占内存,查找慢。所以只好用索引的方式,为每个文件创建一个索引块就是inode
    3. 如果文件快太多,可以用多级索引,比较原始。ext4使用extend mapping,extend代表连续的块,然后众多的extend用b+ tree来管理
    4. 空闲块,空闲inode,可以用位图管理
    5. 目录存文件名和inode的关系,文件名hash处理加速查找

    2021年8月22日星期日

    InnoDB总结

    最近读了一本不错的讲mysql的书MySQL是怎样运行的,借这个契机总结一下自己的InnoDB相关的知识。

    MySQL整体架构

     

























    关注的第一个问题,数据记录是怎么存储的?


    表空间和表

    一个数据库对应一个目录




























    表结构存储

    表名.frm

    表数据存储

    1.使用系统表空间 - 所有数据存储在指定文件(一个或多个)自动扩展大小
    2.使用独立表空间 - 为每个表生成“表名.ibd ”文件,ibd文件存储表中的数据和索引

    表空间结构(数据文件)

    从大到小的逻辑概念是,表空间->段(segment)->区(extent)->页->记录。大家熟知的常识是数据被组织成一颗b+ tree,为了优化单次io的效率,引入了页的概念,io以页为单位。一个表可能有很多页,为了范围扫描的时候尽量是顺序io,引入了区的概念,区里面的页面是连续的。b+ tree的内节点和叶子节点如果混放,也可能会破坏顺序io,所以又引入段的概念,内节点是一个段,叶子节点是一个段。不过还有其他类型的段比如回滚段。

    下面图中FSP_HDR描述表空间属性和本组256个区的属性,XDES描述本组256个区的属性,INODE描述段的属性














    上面的图是独立表空间的结构,系统表空间的结构和独立表空间基本类似,只不过由于整个MySQL进程只有一个系统表空间,在系统表空间中会额外记录一些有关整个系统信息的页面,所以会比独立表空间多出一些记录这些信息的页面。

    段结构

    段不对应表空间中某一个连续的物理区域,而是一个逻辑上的概念,由若干个零散的页面以及一些完整的区组成。























    上图是段属性描述结构

    插入数据的时候需要,段->区->页,寻找一个页来插入记录。为了存储效率减少碎片会把区分成Free, Not Full, Full 几种类型,然后用链表串起来,用于这几个链表定位的基础节点就是上图蓝色部分。段中完整的区之外零散的页面由灰色部分管理,描述的是页面的页号。

    区结构

    对于16KB的页来说,连续的64个页就是一个区,也就是说一个区默认占用1MB空间大小。不论是系统表空间还是独立表空间,都可以看成是由若干个区组成的,每256个区被划分成一组

















    对应段里面对区的管理,区被组织成链表所以有List Node结构。Page State Bitmap表示页面是否空闲的状态。

    页结构

    出于io效率考虑,页是mysql一次io操作的最小单位,而不是记录,页的大小一般是16k。
























    1.File Header存放checksum,页号,页类型等页的全局信息,最重要的是会存放上一页和下一页的页号这样就组成了b+ tree需要的链表结构

    2.Page Header存放页面状态,比如记录数量,还未使用的最小地址,页面在b+ tree中的层级等

    3.File Trailer用来做页面完整性校验,保证刷盘时没有出意外

    4.Infimum+supremum存放最大最小两条虚拟记录

    5.User Records数据记录存入的过程就是不断找Free Space申请空间的过程















    每条记录之间也会形成链表结构,依靠的是记录头的next_record字段。

    记录删除不是物理删除,否则挪动其它记录代价很大,因此在记录头上有个delete_mask。比较自然的做法是删除的记录会从有效记录的链表中拿掉,被删除的记录也组成一个垃圾链表,新记录插入的时候可以重用。

    每条记录在本页中会有一个逻辑位置,是记录头中的heap_no。

    数据记录的大致状况如下图















    6.Page Directory用来加速页内记录查找。具体做法是把记录按相邻关系分成很多组,每组最后一条数据头部n_owned属性记录该组记录数,并把每组最后一条数据的地址偏移量记录到页目录。查找的时候通过页目录二分查找到记录对应的组,然后再组内顺序查找。

    页内部的记录会严格主键自增么?

    应该不会,否则不顺序插入时移动的代价太高,应该只有链表上的逻辑顺序。但页面之间的顺序应该会保持。

    数据记录行

    有多种格式,看一下compact格式的












    1.变长字段长度列表描述对应变长列的长度,按列的逆序放置,只存非null列的内容。

    2.为了节省空间对null值做特殊处理,null值列表按位图的方式存放。

    3.每行数据的最大长度是有限制的,而每个页的大小限制是16k,如果有超大字段导致溢出,会溢出到其他的页,数据行会存一部分数据和一个指向其他地址的指针




















    关注的第二个问题,记录存储和b+ tree的关系?

    B+ Tree索引

    在不考虑索引的情况下页和数据记录已经组织成链表结构












    加入和数据页类似的目录项页组织成b+ tree之后就变成下面这样,




















    目录项和数据记录以比较相似的方式组织称目录项页,主要的区别是record_type不同,再者就是没有数据记录里面那么多列。

    使用主键进行排序,叶子节点直接存储所有数据列的就是聚簇索引。

    用其他列C2进行排序,叶子节点只存储C2+PK两个列的值的b+ tree是二级索引,二级索引查询要多一次回表操作。

    存储碎片问题

    删除空间重用的过程

    1.每当新插入记录时,首先判断PAGE_FREE指向的头节点代表的已删除记录占用的存储空间是否足够容纳这条新插入的记录,如果不可以容纳,就直接向页面中申请新的空间来存储这条记录

    2.如果可以容纳,那么直接重用这条已删除记录的存储空间,并且把PAGE_FREE指向垃圾链表中的下一条已删除记。个人理解这一步会造成记录逻辑顺序和物理顺序不同。

    3.如果新插入的那条记录占用的存储空间大小小于垃圾链表的头节点占用的存储空间大小,那就意味头节点对应的记录占用的存储空间里有一部分空间用不到,这部分空间就被称之为碎片空间

    4.当页面快满时,如果再插入一条记录,此时页面中并不能分配一条完整记录的空间,这时候会首先看一看PAGE_GARBAGE的空间和剩余可利用的空间加起来是不是可以容纳下这条记录,如果可以的话,InnoDB会尝试重新组织页内的记录,重新组织的过程就是先开辟一个临时页面,把页面内的记录依次插入一遍,因为依次插入时并不会产生碎片,之后再把临时页面的内容复制到本页面,这样就可以把那些碎片空间都解放出来(很显然重新组织页面内的记录比较耗费性能)

    所以碎片产生原因是删除数据就会导致页(page)中出现空白空间,大量随机的DELETE操作,必然会在数据文件中造成不连续的空白空间。而当插入数据时,这些空白空间则会被利用起来.于是造成了数据的存储位置不连续。物理存储顺序与逻辑上的排序顺序不同,这种就是数据碎片。update操作除了原地的之外,也有可能产生这种问题。

    不是说数据记录先存储在内存中的么?

    内存存储

    所有读写操作都走磁盘io并不经济,所以mysql会有缓存叫做Buffer Pool。










    1.如上图,Buffer Pool是一块连续的内存空间用来存储数据页,这些数据页和表空间(文件系统)的数据页应该是一样的。

    2.控制块描述对应页的控制信息包括该页所属的表空间编号、页号、缓存页在Buffer Pool中的地址、链表节点信息、一些锁信息以及LSN信息

    3.我们需要知道哪些页面是空闲的,哪些页面是脏的需要flush,所以就有了两个链表,这两个链表都是通过控制块存的指针来组成

    4.读数据页的时候会先从Buffer Pool里面找,需要一个hash map来快速查找,key是表空间号 + 页号(这也是inno db定位页的统一坐标),value是缓存页地址

    5.Buffer Pool空间有限,所以需要用lru来管理,简单的lru碰到预读和范围扫描这些范围读取的场景都会相当于无效,所以可以对lru分区成old和young,对于范围扫描先加载到old,对于old区访问小于一定时间间隔才到young。对于young区的热数据也不需要每次访问都移动,进一步分区只有较不频繁的分区被访问才移动。

    6.可以使用多个Buffer Pool提高多线程性能。

    应该有个write ahead log吧?

    redo log

    redo log就是inno db中的write ahead log,把事务提交是的随机io变成redo log的顺序io+异步刷盘时的批量io。redo log的特点是顺序写,占用空间小主要记录存储表空间ID、页号、偏移量以及需要更新的值。

    之前比较naive的想法是记录个sql就行了,但实际上会比较复杂,redo log更像一个前后的状态变更,比如插入一条数据可能会更新多个索引,对于每个索引有可能造成b tree分裂这种情况会更新多个页面。如果纯按物理状态变更的方式来记日志可能会记相当多的信息,比如File Header、Page Header、Page Directory等等部分,所以感觉inno db采用的是物理+逻辑得折中方案,比如下面这个MLOG_COMP_REC_INSERT类型的redo log
















    向某个索引对应的B+树中插入一条记录的这个过程必须是原子的但又可能涉及到多个页的修改产生多条redo log,这些日志被编成一个组称作一个Mini-Transaction。

    redo log也被组织成页的形式,不同的是页的大小是512字节,写入的时候也是先写内存在合适的时机刷盘,在内存的时候也是连续的内存空间被组织成log buffer。并发事务的redo log是可能交替写入的,写入过程可以看下图。








    有一个较为重要的属性叫Log Sequence Number(LSN),代表系统写入的redo日志量的总和,也是当前写入日志的逻辑时间。

    undo log

    之前一直有疑问为什么需要undo log,如果只是为了回滚事务状态,为啥不能直接用redo log反向操作?现在我理解主要原因是(1)事务能并发,用结构单一的redo log回溯可能复杂度较高,undo log有事务维度的数据结构比较好处理(2)更重要的是需要支持mvcc,所以需要不同的数据结构

    数据记录维度

    数据记录有几个和undo log相关的字段








    1.trx_id,事务id,注意数据记录只有一个事务id字段,意味着唯一占用,对数据记录的操作应该是整个事务内加锁的。之前猜测mysql mvcc遇到写写并发然后类似于乐观锁的方式让一个事务失败是错误的。

    2.roll_pointer,数据记录指向undo log的指针。

    insert,update,delete在undo log上的表现很不相同,所以要分开看

    insert日志结构

    insert操作对应的undo操作是删除就好,而删除也只是状态位和链表处理,即使insert过程中产生了页分裂,也不需要把之前的页合并回去。











    delete日志结构

    delete操作分两个阶段,第一阶段只更新状态位在事务提交阶段才会更新delete链表,如果要回滚只需要考虑第一阶段的影响。













    old trx_id,old roll_pointer,关联之前的undo log形成一个链表,方便后面回溯。

    update日志结构

    update操作分更新主键和不更新主键的情况,不更新主键又根据被更新列的存储空间是否发生变化而不同。(1)如果更新列存储空间不变就可以做就地更新 (2)如果列的大小发生改变就需要先删除(状态位+链表),再插入 (3)更新主键也是先删除再插入,但插入位置可能不是当前页了

    不更新主键的日志结构如下,和delete的undo log类似














    更新主键的日志会是两条,TRX_UNDO_DEL_MARK_REC和TRX_UNDO_INSERT_REC,之所以会这样是因为相当于产生了两条记录,两条记录的状态都需要跟踪

    相关逻辑结构











    1.从整个结构看在系统表空间的第5号页面中存储了128个Rollback Segment Header页面地址,每个Rollback Segment Header就相当于一个回滚段。这是因为可能并发事务多,一个回滚段描述不了所有事务。

    2.每一个Rollback Segment Header页面都对应着一个段,这个段就称为Rollback Segment,翻译过来就是回滚段。回滚段管理所有Undo页面链表的头节点。

    3.undo log分为两个大类,TRX_UNDO_INSERT和TRX_UNDO_UPDATE,两种日志不能混着放。每个事务可能有4个undo log页面链表,除了log类型还有普通表和临时表维度。区分临时表的原因是因为mysql会为undo log本身也记录redo log,但临时表不需要持久化所以不需要记redo log,所以做了这种区分。

    为事务分配Undo页面链表详细过程

    1.到系统表空间的第5号页面中分配一个回滚段,其实就是获取一个Rollback Segment Header页面的地址

    2.看一下这个回滚段的两个cached链表有没有已经缓存了的undo slot

    3.如果没有缓存的undo slot可供分配,那么就要到Rollback Segment Header页面中找一个可用的undo slot分配给当前事务

    4.找到可用的undo slot后,如果该undo slot是从cached链表中获取的,那么它对应的Undo Log Segment已经分配了,否则的话需要重新分配一个Undo Log Segment,然后从该Undo Log Segment中申请一个页面作为Undo页面链表的first undo page

    5.然后事务就可以把undo日志写入到上边申请的Undo页面链表了

    MVCC

    事务这块有个不完备的总结就是基本都是靠版本号来做可见性控制。我们从一个观察者事务的角度来看mvcc。观察者事务有一个ReaderView的概念,在一个ReaderView里面包含 (1)m_ids 生成reader view时所有活跃的读写事务列表 (2)min_trx_id m_ids中的最小值 (3)max_trx_id 表示生成ReadView时系统中应该分配给下一个事务的id值。(4)creator_trx_id 表示生成该ReadView的事务的事务id,也就是当前观察者的事务id

    比如观察者看到的版本链是这样的,









    读取记录的时候的过程大约是,

    1.顺着版本链由新到旧的方向逐个比较trx_id

    2.如果trx_id和观察者事务相同,意味着当前事务做的修改当然可见。

    3.如果trx_id< min_trx_id,说明这个修改版本早在当前reader view创建前就提交了所以可见,算是个读已提交的行为。

    4.如果trx_id>= max_trx_id,说明修改版本产生在reader view生成之后,这种情况不可见。

    5.如果min_trx_id<=trx_id<max_trx_id,这种情况需要比较trx_id和m_ids,如果trx_id在m_ids中,说明创建ReadView时生成该版本的事务还是活跃的,该版本不可以被访问。

    6.通过调整创建reader view的策略来达成read commit或者repeatable read隔离级别。read commit在每次读的时候创建reader view而repeatable read仅仅在观察者事务开始的时候创建reader view。

    7.再强调一次防止写写冲突是通过事务时间内对记录加锁实现的。

    Insert Buffer

    1.大量insert的时候主键可能是顺序insert的,但是二级索引并不一定,这样会产生大量随机io。

    2.如果二级索引非唯一索引就可以引入insert buffer先缓冲一批二级索引的写入然后在某个时间批量刷盘。

    3.不能是唯一索引的原因是唯一索引判断重复需要走二级索引,大量非顺序插入时同样会产生随机io,做insert buffer失去了优化的意义。

    4.如果宕机没有同步到二级索引的insert buffer里面的数据恢复需要不少时间。

    表连接原理

    简单理解表连接就是一个多重for循环
      
    for each row in t1 {   #此处表示遍历满足对t1单表查询结果集中的每一条记录
        
        for each row in t2 {   #此处表示对于某条t1表的记录来说,遍历满足对t2单表查询结果集中的每一条记录
        
            for each row in t3 {   #此处表示对于某条t1和t2表的记录组合来说,对t3表进行单表查询
                if row satisfies join conditions, send to client
            }
        }
    }
    

    代入到实际点的场景就是对驱动表t1的每条记录去t2表查询符合条件的记录,这样每次访问t2表都会把t2表的数据从磁盘加载到内存,代价相当大。

    Block Nested-Loop Join,引入了join buffer,join buffer就是执行连接查询前申请的一块固定大小的内存,先把若干条驱动表结果集中的记录装在这个join buffer中,然后开始扫描被驱动表,每一条被驱动表的记录一次性和join buffer中的多条驱动表记录做匹配,因为匹配的过程都是在内存中完成的,所以这样可以显著减少被驱动表的I/O代价。

    总觉得查询计划还是挺神奇的,怎么运作的呢?

    查询优化

    信息收集

    1.统计表的行数,按照一定算法(并不是纯粹随机的)选取几个叶子节点页面,计算每个页面中主键值记录数量,然后计算平均一个页面中主键值的记录数量乘以全部叶子节点的数量就算是该表的n_rows值

    2.聚簇索引占用页面数和其它索引占用的页面数,从数据字典里(SYS_INDEXES)找到表的各个索引对应的根页面位置,找到叶子节点和非叶子节点对应的几个链表的头,然后统计对应的区的数量和零散页面的数量。简单说就是可以从数据结构的概要信息中拿到。

    子查询优化

    1.物化,子查询直观的逻辑是直接执行子查询然后放内存里面,但是有可能遇到内存放不下的情况所以引入了临时表,会根据数据集大小选择内存存储或者磁盘存储,这个过程也可以叫物化。

    2.子查询转连接,操作方法可能有子查询的表直接上提进行连接,建立临时表去重再进行连接,对于有大量重复记录的进行松散扫描等等。


    运行态

    后台线程

    1.Master Thread - 核心的后台线程,主要负责将缓冲池中的数据异步刷新到磁盘,保证数据的一致性,包括脏页的刷新、合并插入缓冲(INSERT BUFFER)、UNDO页的回收等

    2.IO Thread - InnoDB 1.0版本之前共有4个IOThread,分别是write、read、insert buffer和log IO thread。

    3.Purge Thread - 事务被提交后,其所使用的undolog可能不再需要,因此需要PurgeThread来回收已经使用并分配的undo页。

    4.Page Cleaner Thread - 是在InnoDB1.2.x版本中引入的。其作用是将之前版本中脏页的刷新操作都放入到单独的线程中来完成。而其目的是为了减轻原MasterThread的工作及对于用户查询线程的阻塞,进一步提高InnoDB存储引擎的性能。

    Master Thread

    1.0.x版本之前的master thread的主要逻辑如下,后续版本做了优化,主要是之前写死的地方做了灵活优化,把刷脏页的操作分离到其它线程

    1.整体逻辑有两个循环,约1秒一次的的循环和10秒一次的循环

    2.每秒一次的操作大致有,undo log刷盘,合并insert buffer,刷盘buffer pool 100个脏页,如果没有没用活动会切换到background loop

    3.每10秒的操作大致有,刷盘100个脏页,合并至多5个insert buffer,undo log刷盘,删除无用的undo log页,刷盘100个或者10个脏页

    4.background loop,如果没有用户活动(数据库空闲时),删除20个无用的undo页,合并20个insert buffer,不断刷新100个页

    Binlog相关

    3阶段提交

    1.事务提交过程中涉及到redo log,binlog,以及事务状态等多个资源协调,所以很自然的是多阶段提交的过程,但是这个和分布式事务的3pc还是有区别的。

    2.简单理解就是1阶段写redo log,2阶段写binlog而且如果2阶段成功一定会进行第3阶段,3阶段写事务状态。


    Group Commit

    1.目的很简单为了提升io性能,对redo log,binlog刷盘做批量io优化。但同时带来的另外一个优化是在slave端可以做并发复制。

    2.每个group内的事务是可以并行的是因为mysql把一个时间点内就绪的事务分成一个group,就绪的事务必然没有资源依赖了,所以也可以判断group内的事务没有互相依赖。依赖判断主要还是基于timestamp来做的,也叫做logical lock。

    3.mysql 8.0引入write set机制来判断事务间的依赖,感觉是在判断事务读写记录有没有交集,提供比logical lock更好的并发性。参考Fastest Parallel replication method in MySQL 8

    4.可以参考文章 MySQL组提交 并行复制原理

    Binlog Server

    1.可以参考kingbus里面的文档

    2.binlog server个人理解主要场景还是在mysql replication体系提供binlog复制

    3.本质上说binlog server是一个没有存储引擎的mysql,也类似于mysql配上black hole引擎

    4.binlog server延迟比实际mysql小,所以在多级mysql同步体系里面可以解决大延迟的问题

    5.binlog server在facebook可以搭配hdfs存储引擎,解决binlog存储时间短占空间的问题

    6.binlog server还可以替代从库,因为同步快。然后新主从binlog server追上数据就可以工作

    并发优化

    1. 参考https://zhuanlan.zhihu.com/p/151397269 以及后面的系列
    2. mysql5.6的问题是对b tree有结构修改,比如update时候分裂,会有整个index级别的X排他锁,阻塞所有读操作,并发度底
    3. mysql8.0引入index级别的sx意向锁不阻塞读,update分裂的时候加index sx锁,并且只在分裂节点和它的parent node加x排他锁。这样不阻塞读,但是单个index多个update分裂无法并发
    4. 8.0需要index sx锁是因为b tree搜索路径自顶向下而加锁路径自底向上容易死锁。polar db的优化是去掉index sx锁,b tree node上加一个指向分裂产生的新节点的指针把分裂过程做成两阶段,每阶段只加锁一个节点,中间状态合法(父节点查不到再用分裂指针查一次)。这样就做到了多个smo的并发。
    5. 下一个问题是多个并发读都会访问根节点并加S锁,在多核架构下多核S锁会互相invalidate cache line会造成10%-20%的性能损失。这个问题学术界有一些解法,但都不是很完美。