2023年10月9日星期一

ClickHouse学习笔记

一直对大数据计算很感兴趣,ClickHouse又是感觉很酷的实现,所以花了一段时间研究,整理在这里。

Debug配置

debug上面踩了不少坑。
  1. 首先是用macbook可以build但要花4,5个小时,用台式机会快很多。
  2. 在老一点版本的clion上可以用gdb+'CMAKE_BUILD_TYPE=RelWithDebInfo'。新版默认是lldb和RelWithDebInfo同时用有点问题,捕捉不到断点。
  3. 最后解决办法是在ubuntu台式机上编译,RelWithDebInfo模式,Debug模式还是有点慢而且debug过程中异常退出的概率有点大,然后clion启动remote debug连接ubuntu上面的gdbserver
步骤
  1. ubuntu上面build CMAKE_BUILD_TYPE=RelWithDebInfo
  2. 启动gdbserver gdbserver :1234 build_light/programs/clickhouse server --config-file=/home/hy/config.xml 需要config.xml打开clickhouse的远程连接
  3. clion上面启动remote debug,关键步骤是需要把ubuntu上面编译的可执行文件下载下来,然后设置成debug环境的Symbol file
  4. 在clion的gdb交互窗口输入 handle SIGUSR1 noprint nostop ,handle SIGUSR2 noprint nostop 或者加到gdbinit
  5. stl pretty print,如果有python问题,可以使用自己的gdb。clickhouse使用的是libc++而不是libstdc++,使用https://github.com/koutheir/libcxx-pretty-printers
  6. 查看stl range的方法,reinterpret_cast<uint64_t &>(key_range.left.storage)

重要流程

启动

  1. main(main.cpp)->mainEntryClickHouseServer->Poco::Util::ServerApplication:run->Server::main(Server.cpp)->Poco::Net::TCPServer.start

请求处理

  1. TCPHandler::runImpl->receivePacket->executeQuery->processInsertQuery/processOrdinaryQuery
  2. bio的模式一个线程一个连接

词法分析

  1. Lexer::nextTokenImpl 典型的一个一个token的处理会回看上一个token

语法分析

  1. IParserBase::parse 所有类型parser都会经过这一步,在expected里面记录parse的轨迹。add过程可能触发Lexer::nextTokenImpl(看index是否大于现在scan过的)
  2. ignore方法寻找并跳过某个关键字
  3. 最简单的例子'select * from test',调用链ParserQueryWithOutput::parseImpl->ParserSelectWithUnionQuery::parseImpl->ParserUnionList::parseImpl->ParserUnionQueryElement::parseImpl->ParserSelectQuery::parseImpl(这里逻辑最多)->ParserTablesInSelectQueryElement::parseImpl->ParserTableExpression::parseImpl->ParserCompoundIdentifier::parseImpl(到这里生成table的ast) 经典的递归下降,会产生ast

AST重写

  1. InterpreterSelectQuery::InterpreterSelectQuery->ApplyWithSubqueryVisitor 把with对应的subquery解析出来放map里面
  2. ApplyWithAliasVisitor 也是处理with好像只全局处理没有递归下降
  3. RewriteCountDistinctFunctionVisitor 把count(distinct)替换成count(group by)
  4. QueryAliasesVisitor 访问ast把所有的别名放在一个map里面
  5. ExecuteScalarSubqueriesVisitor 把子查询替换成常量,先查缓存,如果查不到使用interpreter getQueryInterpreter解释执行成block然后缓存。worthConvertingToLiteral 复杂类型不做替换需要的计算比较多
  6. QueryAliasesMatcher 给每个子查询设置一个单独的别名
  7. rewriteMultipleJoins 重写join 
    1. CrossToInnerJoinMatcher 把不同表之间的=表达式(外部)移到join里面 
    2. JoinToSubqueryTransformMatcher 'select * from t1 join t2 on ... join t3 on ... join t4 on ...' 改写成 'select * from (select * from t1 join t2 on ...) join t3 on ...) join t4 on ...'
  8. JoinedTables::rewriteDistributedInAndJoins
    1. InJoinSubqueriesPreprocessor 关联到distributed_product_mode配置,意义在于分布式子查询的优化,最理想的模式是被关联的表sharding的状态下在每个sharding上做local关联。参考分布式子查询。代码做的事情较简单,根据实际情况加GLOBAL,或者改库表名成remote模式这样分布式存储可以识别并本地化
    2. RenameQualifiedIdentifiersVisitor 改写子查询中的列名,去使用新的表名
  9. additional_table_filters 参考core settings 额外过滤条件
  10. joined_tables.makeTableJoin->replaceJoinedTable 谓词下推,把join table替换成(SELECT * FROM table_name) AS table_short_name

继续AST重写

  1. analyzeFunctionParamValues->FunctionParameterValuesVisitor 收集参数视图参数,replaceQueryParametersIfParametrizedView 参数替换
  2. replaceWithSubquery 在查询语句里用物化视图的sql语句替换视图名
  3. TreeRewriter.analyzeSelect 一系列处理比如别名替换,标量子查询替换成常量,多余的列删除...
  4. renameDuplicatedColumns 重复列改名
  5. TreeOptimizer::optimizeCountConstantAndSumOne 语义和count等同的聚合函数改写成count
  6. translateQualifiedNames db.table.column, table.column类型的名字变成正常的名字,*这样的列名扩充成实际列名
  7. LogicalExpressionsOptimizer 布尔表达式优化,一串or变成in
  8. TreeRewriter::normalize 用户定义的sql function apply(函数名替换成函数体),count_distinct,distinct_if等等几个函数apply,EXISTS(subquery)替换,位置参数改成列名,nullin系列表达式处理,函数名改成小写,公共子表达式删除
  9. expandGroupByAll 把group by all里面的all变成实际列
  10. removeUnneededColumnsFromSelectClause 冗余列删除
  11. executeScalarSubqueries 标量子查询替换
  12. PredicateExpressionsOptimizer.optimize where和prewhere里面的谓词下推到子查询里面,https://github.com/ClickHouse/ClickHouse/pull/2015#issuecomment-374283452
  13. TreeOptimizer::optimizeIf 完成了子查询标量替换之后先尝试直接判断标量把整个表达式替换成条件分支,optimize_if_chain_to_multiif 把三元表达式优化成multiple if
  14. TreeOptimizer::apply后面有一批优化在这个函数里面
  15. optimizeFunctionsToSubcolumns 类似于这样'length(arr)' -> 'arr.size0'
  16. optimizeAggregationFunctions 聚合函数里面的数学运算外提'sum(a * 2)' -> 'sum(a) * 2'
  17. convertQueryToCNF 转换成cnf形式,简单说就是最外层全是and这种形式在某些情况更容易使用索引
  18. optimizeWithConstraints 使用constraints关键字的信息干掉查询中多余的部分
  19. optimizeSubstituteColumn 使用constraints信息做列替换
  20. optimizeGroupBy 从groupby里面干掉单射函数和常量表达式
  21. optimizeGroupByFunctionKeys干掉groupby里面的参数不是groupby key的function(不太确定?)
  22. optimizeAnyFunctions所有操作移动到any function之外any(f(x, y, g(z))) -> f(any(x), any(y), g(any(z)))
  23. optimizeCountConstantAndSumOne 把count(常量)和sum(1)这种的简单变成count
  24. optimizeMultiIfToIf multiIf->if在参数个数合适的情况下
  25. optimizeSumIfFunctions 类似sumIf(123, cond) -> 123 * countIf(1, cond)
  26. optimizeArrayExistsFunctions 类似于arrayExists(x -> x = 1, arr) -> has(arr, 1)
  27. optimizeInjectiveFunctionsInsideUniq移除uniq里面的单射函数
  28. optimizeAggregateFunctionsOfGroupByKeys移除group by key上面的min/max等等聚合函数
  29. optimizeRedundantFunctionsInOrderBy ORDER BY x, y, f(x), g(x, y), f(h(x)), t(f(x), g(x)) -> ORDER BY x, y
  30. optimizeMonotonousFunctionsInOrderBy order by里面的单调函数替换成其参数
  31. optimizeDuplicatesInOrderBy 移除order by里面的多余元素,可能是之前的优化产生
  32. transformIfStringsIntoEnum if中的string参数转换成enum
  33. optimizeLimitBy干掉limit by中的重复元素
  34. optimizeUsing干掉using中的重复列
  35. optimizeOrLikeChain 'or xxx like '->multiMatchAny
  36. replaceAliasColumnsInQuery 查询里面的别名展开替换 `day` Date ALIAS toDate(timestamp),`day1` Date ALIAS day + 1,`day2` Date ALIAS day1 + 1 查询条件where day2 = today()展开成 ((toDate(timestamp) + 1) + 1) = today() 
  37. RewriteOrderByVisitor ORDER BY (a, b) -> ORDER BY a,b
  38. MergeTreeWhereOptimizer 计算并尝试把where里面的表达式移动进prewhere,主要是根据一些特殊表达式里面可以判断列的大小来做判断,比如"column_name = constant"
  39. generateFilterActions 根据additional_filter_ast生成filter action
  40. ExpressionAnalyzer::getRootActions 调用ActionsVisitor遍历AST表达式树,生成ExpressionActions,结构不复杂就是遍历ast生成对应的action

执行计划生成

  1. InterpreterSelectWithUnionQuery::execute->buildQueryPlan->executeImpl
  2. prepared_pipe时会增加ReadFromPreparedSource,从远程表读取
  3. executeFetchColumns 作用是返回读取columns数据的stream,可以理解为最基础的数据拉取stream,有count优化,limit优化等等
  4. preliminary_sort 添加order by, limit, distinct等等step
  5. expressions.first_stage 只用远程执行query的场景,添加step,FilterStep,before_array_join,ArrayJoinStep,before_join,converting_join_columns,join step(稍复杂),where step,aggregate step
  6. need_aggregate 有聚合的场景分片上不执行later-stage动作,在windows函数/ORDER BY之前不执行,need_aggregate=false 一些其它的执行顺序的逻辑
  7. expressions.second_stage 分布式查询场景中的聚合节点
  8. executeMergeAggregated从不同节点拉数据合并,having rollup cube等等逻辑
  9. executeWindow 添加windows函数相关的steps
  10. order by相关逻辑,有一些merge sort的逻辑
  11. limit projection offset相关的处理
  12. executeWithFill 处理with fill逻辑
  13. executeSubqueriesInSetsAndJoins 需要子查询结果构建set

pipeline构建

  1. ClickHouse 源码解析(一):SQL 的一生(上)
  2. QueryPlan::buildQueryPipeline->optimize
    1. QueryPlanOptimizations::tryRemoveRedundantSorting 尝试删除多余排序
    2. QueryPlanOptimizations::optimizeTreeFirstPass 深度遍历plan tree,挨个执行getOptimizations里面所有的优化比如 tryPushDownFilter等等
    3. QueryPlanOptimizations::optimizeTreeSecondPass 同样是遍历plan tree
    4. optimizeReadInOrder-同时使用limit和order by如果order by和表顺序一致的优化,变成FinishSorting
    5. aggregation_in_order-GROUP BY表达式至少包含排序键的前缀或注入函数。当从表中读取一个新的键时,聚合的中间结果可以被最终确定并发送给客户端,distinct_in_order distinct上面类似的优化,
    6. optimizePrimaryKeyCondition-不太明白,enableMemoryBoundMerging-enable_memory_bound_merging_of_aggregation_results相关的设置
  3. while (!stack.empty()) ... DFS方式生成pipeline,生成过程调用updatePipeline函数
  4. ISourceStep::updatePipeline(创建并初始化pipeline)->ITransformingStep::updatePipeline(往pipeline添加transformer)
  5. ReadFromMergeTree::initializePipeline->spreadMarkRanges->readInOrder->createSource 这样就有了从源数据读的processor
  6. LimitStep::transformPipeline 给pipeline添加transformer

pipeline执行

  1. ClickHouse 源码解析(二):SQL 的一生(中)
  2. ExecutingGraph::initializeExecution入口函数,将没有出边的node入栈(direct_edges),然后弹出反复调用updateNode
  3. ExecutingGraph::updateNode,做反向状态传播把output node需要数据的状态传播到input,后面是细节展开
  4. processor.prepare(node.updated_input_ports, node.updated_output_ports) 调用 Node 对应 IProcessor::prepare() 方法尝试pull数据
  5. IOutputFormat::prepare(),更新相邻边的状态和自己节点的状态,如果没有数据就设置成NeedData
  6. if (!need_expand_pipeline) 将 Node 相邻的待更新 Edge 放入 update_edges 这个栈中
  7. if (updated_processors.empty()) 简单来说,就是将Edge所指向的Node放入update_processors栈中
  8. ISource::prepare()最终会执行到这里,由于ISource没有 input,则直接返回Status::Ready
  9. case IProcessor::Status::Ready: queue.push(&node) input节点放入入口传入的queue里面,ExecutingGraph初始化完成
  10. ClickHouse 源码解析(三):SQL 的一生(下)
  11. PipelineExecutor::executeImpl 调度执行每个节点 executeStepImpl 单线程执行分支
  12. tasks.tryGetTask->context.executeTask->graph->updateNode执行完节点后更新相邻节点的状态->tasks.pushTasks新的task入全局队列
  13. executeTask->executeJob->IProcessor::work()算子处理数据、传递数据的关键方法
  14. ISource::work() tryGenerate()生成数据返回chunk
  15. spawnThreads() 多线程分支,比较简单,起多个线程threads.emplace_back(后面有没有线程池不确定) 同时从async_task_queue里面拿数据执行,执行流程和单线程类似

MergeTree读链路

  1. CLICKHOUSE 源码解析: MERGETREE READ PATH
  2. InterpreterSelectQuery::executeFetchColumns->StorageMergeTree::read->MergeTreeDataSelectExecutor::read 入口
  3. MergeTreeDataSelectExecutor::filterPartsByPartition 根据partition(比如date分区)筛选parts selectPartsToRead里面根据minmax_idx_condition过滤。之前的调用链MergeTreeDataSelectExecutor::read->readFromParts->ReadFromMergeTree::initializePipeline->getAnalysisResult->selectRangesToRead
  4. MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes 根据主键或者索引筛选出要读取的part range
    1. markRangesFromPKRange 根据 primary key筛选part下的ranges pk在内存里
    2. filterMarksUsingIndex 根据二级索引筛选ranges 可能会读磁盘
    3. process_part 可能并行选取ranges,这里为止都属于selectRangesToRead
  5. ReadFromMergeTree::spreadMarkRangesAmongStreams->ReadFromMergeTree::read->ReadFromMergeTree::readFromPool 后面是实际读取数据的过程
  6. MergeTreeReadPool::fillPerThreadInfo 将读取任务分配到多个线程,按Disk分配线程负载,最大化磁盘IO带宽利用率,将所有 Part 按物理存储磁盘排序,按序将磁盘负载分散到 N 个线程
  7. MergeTreeReadPool::getTask 并不会将线程需要完成的所有任务初始时全部分配,而是每次仅会分配一个小量级的数据读取任务。通过参数: merge_tree_min_rows_for_concurrent_read / merge_tree_min_bytes_for_concurrent_read 配置读取的数据量级。
  8. MergeTreeRangeReader::read 先读取Pre-Where相关Columns,过程中,还会对数据进行Filter,如果 Filter后无数据,便不会进行后续的Read Remaining Columns步骤。
  9. MergeTreeReaderWide::readRows 函数读取范围行数内的所有列的数据。分别对每一列数据,使用对应的反序列化方法进行读取 readRows->readData->serialization(...)反序列化
  10. SerializationString::deserializeBinaryBulk->deserializeBinarySSE2 读数据并复制到data,可以开simd优化,_mm_storeu_si128 / _mm_loadu_si128为C++编译器提供的SIMD内置函数 _mm_loadu_si128 -> SSE2 指令 MOVDQU _mm_storeu_si128 -> SSE2 指令 MOVDQU 此处为较为简单的 SIMD 应用场景: 快速内存复制 (一次性复制更多字节内存)

MergeTree写链路

  1. CLICKHOUSE 源码解析: MERGETREE WRITE-PATH
  2. InterpreterInsertQuery::buildChainImpl->SinkToStoragePtr write->MergeTreeSink::consume 入口
  3. MergeTreeSink::consume 将Block按照Partition拆成N个Block, 写入Part到Temp目录防止Dirty Read, 将TempPart挂载到表空间下, 这样便可以被客户端查询到数据, 触发一些后台执行的任务(例如执行Merge)
  4. MergeTreeDataWriter::splitBlockIntoParts通过executePartitionByExpression()计算出与partition相关的字段(可能会新增字段),通过buildScatterSelector()计算出row与partition之间的映射关系,通过IColumn::scatter()将每个Column都完成一拆多,最终组合为多个Block
  5. MergeTreeDataWriter::writeTempPart 写数据的具体实现
    1. minmax_idx->update(…) 对 Partition 涉及的字段统计 min-max 值,作为额外的谓词下推索引
    2. stableGetPermutation() 对 Block 数据按照 Order By 字段排序,会预先检查是否排序,因此如果数据已经排过序再写入,事实上写入速度会有提升,stableGetPermutation 为稳定排序, 底层实现为 std::stable_sort,时间复杂度为 O(N((logN)^2))
    3. updateTTL(…) 是为支持 TTL 数据生命周期管理做的数据 min-max 索引,用于快速判断一个 Part 是否命中 TTL
    4. MergeTreeData::createPart 创建 Part 实例 注意 choosePartType() 会按照 Block 大小和行数决定是否使用 Compact/Memory 存储方式,默认为 Wide 方式存储
    5. MergedBlockOutputStream::writeWithPermutation, 实现Block数据的写入 MergedBlockOutputStream::writeImpl->MergeTreeDataPartWriterWide::write首先由computeIndexGranularity() 计算当前Block的IndexGranularity getBlockAndPermute->column->permute会对column排序 writeColumn 将每个 Column 分别写入磁盘,calculateAndSerializePrimaryIndex、calculateAndSerializeSkipIndices 写入主键、二级索引
    6. MergeTreeDataPartWriterWide::writeColumn对Column中的每个granule分别执行 writeSingleGranule()写入Column内容到.bin flushMarkToFile()写入Column.mrk索引
    7. MergeTreeDataPartWriterWide::writeSingleGranule->SerializationString::serializeBinaryBulk(完成序列化,根据数据类型会有不同)ColumnString中的多个StringValue是连续组织的,使用Offsets来记录不同的String边界。序列化时,对每个StringValue先写入长度,再写入String内容,这里是写入到WriteBuffer中,真正落地到磁盘还会经过一次压缩

MergeTree Merge

  1. ClickHouse 源码解析: MergeTree Merge 算法
  2. 每次插入数据都会在后台尝试merge,但可能因为选不出合适的part跳过
  3. 选择part的逻辑,首先加入只选择相邻part的限制,然后遍历所有组合两层for循环组合嵌套,选择min sum_parts_size / (parts_num - 1),就是尽量选小而多的组合,比较碎片。整体上说应该是universal merge,n层几个merge出来生成n+1层
  4. merge算法是k-way merge+k堆,最常规做法。合并数据有两种做法,Horizontal 与 Vertical,Horizontal简单每次写一整行,Vertical按列写比较向量化,先写主键以及对应的part,然后拿着part信息再写其它列
  5. StorageMergeTree::scheduleDataProcessingJob->SimpleMergeSelector::select 选择需要merge的parts的具体实现
  6. MergingSortedAlgorithm::mergeImpl Merge的实现,就是循环的从待Merge的Part中(存放于queue对象中),找到最小的数据行,并将其放到输出结果集中queue的实现为SortingHeap。针对 Vertical 算法,需要额外输出 row_source,即输出数据行对应的原始 Part 编号
  7. ColumnGathererStream::gather VerticalMergeStage 阶段的主要任务就是为剩余的Gatherer Columns 包含的列进行输出

Group By原理

  1. MySQL分组查询Group By实现原理详解 mysql group by本质上说是一种排序,可以利用索引事先排好序,如果不能直接从索引得出group by就需要在临时表排序
  2. ClickHouse之聚合功能源码分析 源码分析 | ClickHouse和他的朋友们(15)Group By 为什么这么快
  3. InterpreterSelectQuery::executeImpl->executeAggregation 初始化各种配置项,构造AggregatingStep,并将其添加到query_plan中
  4. AggregatingStep::transformPipeline 对于每个上游的数据流,会构造一个AggregatingTransform的节点进行预聚合,最后pipeline resize了一下,避免了下游单线程跑可能性能差的问题(https://github.com/ClickHouse/ClickHouse/issues/35096)
  5. AggregatingTransform::work->consume 预聚合阶段,通过调用aggregator.executeOnBlock(...)函数执行预聚合
  6. AggregatingTransform::work->initGenerate 扩展pipeline,合并阶段,当预聚合阶段结束(上游通道关闭,或者聚合行数到达了设定的上限),通过扩展pipeline替换上游节点,然后等待合并数据。这里根据aggregator在预聚合过程中是否因为限制内存使用而将数据写到磁盘文件,扩展的节点是不同的
    1. 如果没有写到磁盘文件,扩展ConvertingAggregatedToChunksTransform节点
    2. 否则扩展SourceFromNativeStream,GroupingAggregatedTransform,MergingAggregatedBucketTransform,SortingAggregatedTransform
    3. GroupingAggregatedTransform 将single_level block转化为two_level block,并按照block_num进行组合,然后交给MergingAggregatedBucketTransform进行合并操作
    4. MergingAggregatedBucketTransform 进行合并操作,因为MergingAggregatedBucketTransform可以有多个,因此合并阶段也可以是并行的
    5. ConvertingAggregatedToChunksTransform 如果预聚合数据是two_level block,则扩展节点进行并行合并,然后在本节点进行sort;否则直接在本节点合并
  7. ExecutingGraph::updateNode->AggregatingTransform::expandPipeline AggregatingTransform构造了一个新的input_port,和扩展节点中的最下游节点的output_port连接起来,下次执行prepare()函数的时候,获取的input_port是新构造的那个,这里实际上等价于切换了上游数据流,切换完成。从预聚合阶段切换到合并阶段的,这部分内容也是典型的运行时扩展Pipeline的案例:需要根据计算时的数据动态的判断之后需要执行的节点类型和结构
  8. Aggregator::Aggregator 记录预聚合前的内存使用,作为是否将预聚合数据写入磁盘文件的依据,每个聚合函数有个对应的State对象,该对象作为预聚合过程中内部数据的存储点,一个sql语句中可以有多个聚合函数,ClickHouse中是将多个聚合函数的State对象分配在一整块内存上的,因此,这里需要计算每个State对象的大小和偏移量,根据键类型选择合适的哈希表。
  9. Aggregator::chooseAggregationMethod 这里根据“grouping key” 的数量、特点(lowCardinality、isNullable、isFixedString)等性质,选择合适的哈希表类型,默认选择serialized类型的哈希表,这个哈希表的键就是将多个“grouping key”拼接,针对个别类型的哈希表,构造cache
  10. Aggregator::executeOnBlock 执行预聚合的接口
    1. initDataVariantsWithSizeHint result(类型AggregatedDataVariants)是一个out型参数,实际的哈希表也是在这个对象中,这里会执行初始化操作,即根据aggregator选择的哈希表类型来初始化对应的哈希表
    2. materialize columns ClickHouse中有些列不能在聚合操作中直接使用,比如Const Column、Sparse Column等。这里对“grouping key”中这些列做了具化处理(即格式转换为普通格式
    3. prepareAggregateInstructions 这个函数内部是聚合函数的参数拼接的过程,聚合函数的参数,根据名字找到对应的列数据
    4. executeWithoutKeyImpl / executeImpl 执行具体的聚合操作
    5. convertToTwoLevel和writeToTemporaryFile 聚合操作之后,判断是否要将单层哈希表转换为双层,以及是否将数据写到磁盘文件中。
  11. executeImpl->Aggregator::executeImplBatch 遍历需要聚合的行,对每一行我们计算其哈希表中的键,如果这个键在哈希表中不存在,则通过aggregates_pool->alignedAlloc申请一个内存块,并在内存块上初始化每个聚合函数的State对象,遍历聚合函数,依次执行预聚合操作
    1. 如果聚合key是8 bit, addBatchLookupTable8 数据根据key放到4*256的空间里面,256就是uint8的空间,4可能是因为前面把数据处理成这样。然后调用add, merge这个标准流程处理数据
    2. aggregates_pool->alignedAlloc申请内存给createAggregateStates创建新的State对象,这里是可以用上jit的
    3. addBatch 使用聚合函数做聚合

函数调用向量化实现

  1. ClickHouse源码笔记3:函数调用的向量化实现
  2. SELECT a, abs(b) FROM test 对应的stream ExpressionBlockInputStream->ExpressionBlockInputStream(生成abs(b),然后删除列b替换成abs(b))->TinyLogBlockInputStream 这块的实现已经改了,看ExpressionTransform
  3. ExpressionActions::executeAction-> FunctionUnaryArithmetic.executeImpl-> UnaryOperationImpl::vector-> AbsImpl.apply 这就是一个完美的向量化优化代码,没有任何if, switch, break的分支跳转语句,for循环的长度也是已知的

select where的实现

  1. ClickHouse源码笔记4:FilterBlockInputStream, 探寻where,having的实现
  2. FilterTransform::FilterTransform 寻找filter column的位置,判断是不是常量列
  3. FilterTransform::doTransform 执行filter表达式,生成FilterDescription并通过其过滤其他的每个列
  4. ColumnVector<T>::filter 对于每个列,遍历bool数组,然后将合法数据塞进一个新的列之中,最终新的列替换旧的列,就完成了一列数据的过滤。TargetSpecific::AVX512VBMI2::doFilterAligned也可以打开avx512指令集优化

排序实现

  1. ClickHouse源码笔记6:探究列式存储系统的排序
  2. PartialSorting->MergeSorting 两个重要的pipeline
  3. Partialsortingtransform::Transform->sortBlock 对单个block排序,相当于处理一个batch
  4. getBlockSortPermutationImpl->ColumnVector<T>::getPermutation->ColumnVector<T>::permute 先通过单列排序,拿到每一列在排序之后的IColumn::Permutation perm。然后Block之中的每一列都利用这个perm, 生成一个新的排序列,替换旧的列之后,就完成Block的排序了
  5. ColumnVector<T>::getPermutation 如果有limit做std::partial_sort,如果数字类型考虑使用基数排序,否则使用默认的快排
  6. ColumnVector<T>::permute 简单的根据map(排序列和被排序列之间的位置索引)生成一个新的排序后的列
  7. MergeSortingTransform::consume 从source stream读取到memory block,如果超出限额就持久化到文件做外部排序,增加BufferingToFileTransform,MergingSortedTransform两个processor
  8. 排序算法因为数据依赖没做太多simd优化,有些技巧估计还没普及

分布DDL

  1. ClickHouse 分布式DDL 执行原理剖析
  2. InterpreterAlterQuery::executeToTable->executeDDLQueryOnCluster 判断是否是分布式ddl
  3. DDLWorker::enqueueQuery 往zk的distributed_ddl目录提交queue-000xxx记录,创建ddl task和子目录(/distributrd_ddl/queue-000xxx, /distributed_ddl/queue-000xxx/active, /distributed_ddl/queue-000xxx/finished)
  4. getDistributedDDLStatus 阻塞等待各个节点工作完成,扫描zk目录(/distributed_ddl/queue-000xxx/finished)
  5. DDLWorker::runMainThread->scheduleTasks->processTask->tryExecuteQueryOnLeaderReplica 需要leader角色的抢leader锁
  6. replicated_storage->getStatus ->tryExecuteQuery 获得副本锁,提交非分布式但可能多replica的ddl任务-副本间任务({zk_path}/log/log-00000xxx {zk_path}/mutations/0000xxx)
  7. waitForLogEntryToBeProcessedIfNecessary 等待副本间任务都完成
  8. StorageReplicatedMergeTree::queueUpdatingTask->pullLogsToQueue 同步/{zk_path}/log/log-00000xxx到自己的副本目录/{zk_path}/replicas/{replica}/queue/queue-0000yyy
  9. BackgroundJobsAssignee::threadFunc->scheduleDataProcessingJob->executeLogEntry->MergeTreeBackgroundExecutor<Queue>::threadFunction()->task->executeStep() 每个副本拉取任务执行
  10. tryFinalizeMutations 更新表示完成的zk状态

join原理

本地join

  1. Nested Loop最基础的join方式,双重循环性能差
  2. Block Nested-Loop Join 使用join buffer,每次外层循环读取多条数据放进buffer,然后在内层一次执行完这多条外层数据的比较
  3. Sort Merge Join 外层,内层的数据先根据join key排序,然后有序merge,优点是使用内存可控
  4. hash join,小表放进hash,然后大表逐条记录hash lookup,如果小表无法放进内存就需要dump到文件,快但耗内存。

分布式join

  1. shuffle join 根据join key做hash把数据拆到不同的计算节点,这样每个节点都是local join,然后合并不同节点的结果
  2. broadcast join 小表广播,把小表复制到所有大表数据节点形成local join
  3. co-located join 数据已经根绝join key分好片了,天然的local join

Clickhouse的join

  1. clickhouse的本地join先尝试hash join,内存超出就merge join
  2. ClickHouse Join为什么被大家诟病? 
  3. 分布式join是两阶段执行的模型,协调者转发请求到多个worker节点,收集worker节点的计算结果做汇总计算
  4. 普通分布式Join,协调者分发左表到n个worker节点,worker节点去其他n个节点去拿右表的全部数据,形成n*n数据传输读取,ck未能真正实现shuffle
  5. global join,为了避免每个worker重复拉取一遍右表数据,改成由协调者拉取一遍右表数据然后传输到worker节点,能避免右表的重复计算,但是有大量的重复传输
  6. Colocate JOIN,可以在生成表的时候就按join key方式分片,然后用一个分布式表join一个local表,实现的效果就是colocate join 
  7. 两阶段模型,第二阶段如果计算密集比如count distinct协调者会成为瓶颈
  8. 不支持Shuffle,如果右表是大表,没法做多节点并行优化
  9. 有些场景过滤条件下推做的不好需要人工改sql优化
  10. 没有runtime filter,runtime filter在右表很小的时候可以作为filter提前应用到左表避免很多data load和hash probe

hash join流程

  1. ClickHouse之本地Join(hash join部分)源码分析(doing)
  2. FillingRightJoinSideTransform.work->HashJoin::addJoinedBlock->joinDispatch->insertFromBlockImpl->insertFromBlockImplTypeCase 使用右表构建hashtable,数据结构看RightTableData.maps
  3. JoiningTransform::transform->HashJoin::joinBlock crossjoin直接笛卡尔积,right或者full join物化一下稀疏或者常量列,然后转到joinBlockImpl
  4. joinBlockImpl switchJoinRightColumns返回一个row_filter,这个row_filter记录了左表block中哪些行需要保留,joinRightColumns查map看是否保留,block.insert(added_columns.moveColumn(i)) 将added_columns拼接到左表的block中,block.safeGetByPosition(i).column->filter(row_filter, -1) 这里执行真正的过滤操作

两阶段分布式执行

查询

  1. ClickHouse 分布式DDL 执行原理剖析
  2. StorageDistributed::read sql重写主要是替换库名表名,ClusterProxy::executeQuery主要是修改和设置一些配置 SelectStreamFactory::createForShard 决定选择本地副本还是远程副本
  3. pool->getManyChecked 这里会去连接池中获取远端的entry,entry中包含着请求其他server的必要信息 ConnectionPoolWithFailover::getManyImpl->getMany 真正获取entry并进行排序的过程
  4. 简单说就是遍历所有shard,然后选择合适的方式连接每个shard,生成对应的stream
  5. 生成stream的逻辑在emplace_local_stream,emplace_remote_stream等等几个匿名函数,实际读取数据的逻辑也在里面
  6. ReadFromRemote::initializePipeline会生成lazily_create_stream,里面会用到ConnectionPoolWithFailover::getManyImpl

写入

  1. 同步写入是指数据直写入实际的表中,而异步写入是指数据首先被写入本地文件系统,然后发送到远端节点。异步写是主流
  2. 主要逻辑就是根据sharding_key进行拆分,然后写到不同的位置,有目录加锁逻辑

一些优化点

groupBitmap替换uniqExact

  1. 为什么groupBitmap比uniqExact快?
  2. 该优化在select countDistinct的时候使用,高基维效果明显
  3. 关键区别在merge操作,只有分布式场景需要merge。分布式场景先在数据节点执行部分聚合,然后都传到计算节点做合并然后排序。决定是否做分布式聚合的代码在 Aggregator::mergeOnBlock

基于规则的优化

  1. ClickHouse SQL 的十项优化规则 多数优化可以在前面的ast重写的prewhere部分找到

一些其他系统的sql优化

  1. SQL 子查询的优化 关联子查询join下推,算子上提
  2. 子查询相关的优化 tidb的一些子查询优化,相对简单
  3. Max/Min消除,在索引列上的时候转换成order by limit 1就只用取一行
  4. 谓词下推,下推到理数据源尽量近的地方,很好理解
  5. Join Reorder 算法简介 Join Reorder,多表join,根据表的数据量分布选择合适的join顺序

jit原理

  1. JIT in ClickHouse
  2. USE_EMBEDDED_COMPILER 启动jit编译 ExpressionActions::ExpressionActions->ActionsDAG::compileExpressions->ActionsDAG::compileFunctions->compileFunction 这里实际生成llvm的ir,在子类的compileImpl里面补全更多细节(搜关键字llvm::IRBuilderBase) 一个很容易理解的例子是AggregateFunctionCount
  3. llvm生成ir的用法可以参考链接里面的例子
  4. SELECT a + b * c + 5 -> plus(plus(a, multiply(b, c)), 5) 会编译压平成一个function,尽量simd
  5. aggregate functions 有很多虚函数,有不少wrapper,也是通过jit压平。多个列上的聚合函数也可以合并成一个大聚合函数。
  6. 对于排序,如果要比较多个列就是多个虚函数调用,这里也可以jit压平成一个函数提升性能
  7. CHJIT::compileModule->JITCompiler.compile->llvm::legacy::PassManager 代码生成,结果存储在llvm::MemoryBuffer
  8. llvm::RuntimeDyld->loadObject->resolveRelocations 动态链接器符号解析,重定位
  9. compiled_module.function_name_to_symbol.emplace 维护<function name>到生成的符号的映射
  10. module_identifier_to_memory_manager[current_module_key] 维护函数(Module)到MemoryManager的映射

bitmap的一些细节

  1. createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData> 创建bitmap
  2. SmallSet roaring::Roaring64Map roaring::Roaring 三种实现
  3. roaring_array_s 32位核心数据结构,维护container(数据桶), key(桶编号), typecodes(桶类型)等数据结构
  4. roaring_bitmap_and->container_and->bitset_bitset_container_intersection->bitset_container_and_justcard->_mm256_and_si256 and流程
  5. Roaring64Map 会重用32位的Roaring结构,RoaringBitmap64是由一系列RoaringBitmap32表示。实现方式有很多种,一种比较通用的做法用map存储,是把前32位存成key,value是后32所对应的RoaringBitmap32。前32位key的做法和32位roaring的高16位做法类似

action的一些细节

  1. action类型等等描述(ActionDAG.h),描述计算结构的一种通用结构,最有用的应该是function类型的actionType
  2. action创建是ConstInDepthNodeVisitor<ActionsMatcher, true> visit产生
  3. 通过ExpressionActions挂着的,貌似会挂到很多地方
  4. 实际执行在ExpressionActions::execute
  5. 执行点案例MergeTreeRangeReader::MergeTreeRangeReader

数据存储格式




2023年10月3日星期二

SIMD相关

原理

  • Add ALUs to increase compute capability
  • 应该就是增加计算单元提升算力,缺点可能是要占用芯片面积就没法加其他东西了,毕竟使用场景有限,所以avx512会因为体积和能耗被某些人鄙视。

字符查找

  •  Looking for an index of an element in array via SIMD. A fast way
  • 16个8位字符打包成128位__m128i ARR = _mm_setr_epi8(1,2,3...)
  • 设置被比较数字的vector__m128i N = _mm_set1_epi8(3)
  • 比较并得到按bit的mask,__m128i cmp = _mm_cmpeq_epi8(ARR, N); int mask = _mm_movemask_epi8(cmp);
  • _tzcnt_u32返回尾部最小有效零位的个数,也就是第一个满足条件的字符的位置

基数排序

  • Bitwise MSB Radix Sort on AVX-512
  • 基于bit的MSB基数排序,从高位开始每bit做比较像快排一样做左右划分,这样不需要额外的空间去做基数归类
  • simd排序有个麻烦的点是怎样搬动比较key之外的数据,这里的做法是看成kv并打包在一起,如果v是64位指针kv就是128位,512位的vector可以放4个,如果v是32位的位置kv是64位放8个
  • 每次拿vector中所有key中的一位进行比较得到一个mask, testAndCount(bitMaskVec, keyPayload, sortBits, popcnt)
  • 然后关键点是mask_compressstoreu这个指令,通过sortBits,把keyPayload符合条件的位对应的数据写到d + writePos[0],这样就实现了搬动数据
  • 但是感觉一位一位排序似乎效率有点低,把simd的优势又还回去了?

排序网络(Bitonic Sorting Network)

  • Bitonic_sorter 
  • 双调排序,适合并行度比较大的场景,双调序列是先单调递增然后单调递减的序列,将双调序列等分然后两个子序列一一比较得到min和max序列又都是双调序列。排序算法是先从原子一级级组成一整个双调序列,然后再一级级拆散最后就是排序好的
  • 复杂度是n*logn^2,但并行度很好
  • simd算法详见A Novel Hybrid Quicksort Algorithm Vectorized using AVX-512 on Intel Skylake

快速排序

  • Fast Quicksort Implementation Using AVX Instructions
  • 假设一次操作4个数,VBROADCASTSS (pivot), P (设置pivot)
  • VMOVDQU (in), D (取数组成向量)
  • VPCMPGTD D, P, C (向量和pivot比较)
  • VMOVMSKPS C, r (生成按bit的mask)
  • SHL $4, r VPERMILPS permTableLesser(, r), D, L VPERMILPS permTableGreater(, r), D, G (用mask查表得到两个向量内位置list,然后用vpermilps指令从源向量取数到目标寄存器,完成了partition这一步)
  • permTableLesser这种表是预先构建好的,因为mask寄存器只有16种可能对应的位置都是确定的
  • POPCNT r, r0 MOV $4, r1 SUB r0, r1 (计算partition后两边各有几个数)
  • VMOVDQU L, (Lptr) VMOVDQU G, (Gptr) (寄存器写回内存比如数组里面)
  • LEA (Lptr, r0, 4), Lptr LEA (Gptr, r1, 4), Gptr (移动数组指针)
  • 256位的AVX2增加了VPGATHERQQ指令,这样源向量里面可以存kv然后VPGATHERQQ只取出key来比较
  • AVX512增加了VCOMPRESSPS指令,直接通过mask写无需查表
  • 存kv始终是个痛点,放一起会减少向量里面的记录数,A Novel Hybrid Quicksort Algorithm Vectorized using AVX-512 on Intel Skylake 给的一个解决办法是value单独放一个等长的数组,根据mask partition的时候也给value数组操作一次

2023年9月14日星期四

Rocksdb源码笔记

重要流程

DBImpl::Get

  1. GetAndRefSuperVersion首先要获取super version,sv记录column family全局状态,有哪些memtable,有哪些sst等等。所以每次flush或者compaction之后都会更新sv
  2. 更新sv和读取sv可能发生冲突,所以用thread local机制来尽量避免锁
  3. 从super version读,读的顺序是mem(memtable), imm(immutable memtables), current(version sstables)
  4. MemTable::Get 先通过bloomfilter过滤 bloom_filter_->MayContain 然后MemTableRep::Get,如果是skiplist实现就是在skiplist里面找,还有hashskiplist,skiplist是比较均衡的选择
  5. MemTableListVersion::Get 顺着immutable memtable list查找。因为memtable是新写的数据所以总数据占比可能不高,命中率也可能不高
  6. Version::Get 遍历所有的sst files查找数据,遍历的过程l0顺序其它二分加上file indexer。处理每个sst文件是通过TableCache::Get,里面整合了各种读cache和读文件的过程
    1. 如果配置了row cache GetFromRowCache直接拿到kv值。cache key会用上sst file number做前缀,所以不会有update只有get失败时用insert填充
    2. BlockBasedTable::Get 封装了从block cache或者sst file读取的逻辑,这个talbe对应sst file number
    3. FullFilterKeyMayMatch 通过bloom filter快速过滤该sst file
    4. IndexIterator.seek(key) 在索引里寻找key对应的block(包含地址,大小等信息)
    5. BlockBasedTable::NewDataBlockIterator-> BlockBasedTable::MaybeReadBlockAndLoadToCache 从blockcache里面读取或者读sst file并放回block cache
    6. DataBlockIter.SeekForGet 读取block的实际内容
    7. DataBlockIter::SeekForGetImpl 通过hashmap寻找key对应的binary seek index位置,然后再通过这个位置读取block里面的data
    8. get_context->SaveValue 直接将Block中的数据地址赋给用户传进来的PinnableSlice*中(知名优化点)

DBImpl::WriteImpl

  1. WriteImpl 入口,write_thread_.JoinBatchGroup加入当前batch的线程组,抢主(cas队列头),如果失败需要等待到下一个可运行状态(几种情况比较细节)
  2. WriteBatchInternal::InsertInto 如果出来的状态是并发写入memtable,则自己不是leader且leader已经将数据写入wal,可以将自己的数据写入memtable
  3. write_thread_.ExitAsBatchGroupFollower 判断自己是否是最后一个写入memtable的线程,如果是则需要做最后的处理设置last seq、选出下一个正在等待的线程中的leader继续做group commit逻辑
  4. PreprocessWrite 切换wal,memtable等等写前预处理,从这里开始都是leader的行为
  5. write_thread_.EnterAsBatchGroupLeader 初始化batch group leader信息,处理max_size相关逻辑,CreateMissingNewerLinks(newest_writer)将当前writer链表组成双向链表语义上得到了一个batch
  6. bool parallel 判断是否能并发写memtable,重要条件是所有batch里面没有merge操作
  7. WriteToWAL 写wal
  8. WriteBatchInternal::InsertInto 写memtable,根据是否能并发选择写入方式,两个分支在调用参数上稍有区别,并发就只写当前batch,非并发就leader写所以batch。顺序写分支要先write_thread_.LaunchParallelMemTableWriters(&write_group) 唤醒所有其它writer
  9. should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w) 判断自己是否最后一个写memtable的线程,如果是就调用write_thread_.ExitAsBatchGroupLeader设置下一轮的leader

Compaction

  1. 需要关注的几件事,是否要compact,选择哪些文件compact以及怎样compact
  2. DBImpl::MaybeScheduleFlushOrCompaction 后台自动compaction的入口
  3. LevelCompactionPicker::NeedsCompaction 是否需要compaction,会逐层判断分数是否>1,分数计算在VersionStorageInfo::ComputeCompactionScore
    1. 对于L0,文件个数/level0_file_num_compaction_trigger得到一个分数,total_size/level_max_bytes_[base_level_]得到另一个分数取最大值
    2. 其它level,level_bytes_no_compacting/MaxBytesForLevel(level) 计算分数
    3. MaxBytesForLevel可以来自配置也可能动态算的,动态算法是先到数据量最大一层,然后按乘数因子递减
  4. DBImpl::BackgroundCompaction->LevelCompactionBuilder::PickCompaction 选择需要compact的文件
    1. SetupInitialFiles->PickFileToCompact 先选择startlevel和targetlevel,之前已经根据分数倒排序所以直接拿最上面那层做start,targe直接+1,PickFileToCompact 选择要compaction的文件,首先startlevel根据文件大小倒排然后选最大的,会用ExpandInputsToCleanCut把range overlap的都选进来,但在level compaction似乎没什么用,然后选择合适range空间的output files,GetOverlappingInputs,同样也会ExpandInputsToCleanCut也没啥用。选好后会跳过正在compact的文件。
    2. GetOverlappingL0Files 如果start level是L0逻辑会有区别,需要比较所有文件
    3. SetupOtherInputsIfNeeded->SetupOtherInputs 如果是L0,start在上面一步变化了,output也要相应变化
    4. GetCompaction最终返回一个Compaction对象
  5. CompactionJob::Prepare->GenSubcompactionBoundaries 需要并发的compact会被抽象成sub compactions,这里会解析生成sub compaction以及对应的边界
    1. 原理https://github.com/facebook/rocksdb/wiki/Subcompaction
    2. Compaction::ShouldFormSubcompactions满足条件才做sub compaction,leveled情况下L0的compaction或者kRoundRobin选文件或者手动状况下满足条件
    3. 遍历所有level所有file,生成对应的anchor,一个文件128archor,代表一个范围。然后排序去重
    4. 然后计算sub compaction并把所有的archor平均分,并保存在boundaries_里面
    5. 然后Prepare里面会生成sub compaction的列表sub_compact_states
  6. CompactionJob::Run 遍历sub compaction都放到线程池里面启动多线程compaction其中当前线程会分担sub_compact_states[0],执行的函数是ProcessKeyValueCompaction。执行完之后应该直接生成sst file没有合并这一步
    1. 实际上每次BackgroundCompaction一般是从start选一个文件output选多个文件,然后多次BackgroundCompaction形成并行关系,满足sub compaction条件之后可以进一步文件内部并行
  7. ProcessKeyValueCompaction 取出subcompaction的kv放到一个迭代器里面(此时会构造堆结构)
    1. VersionSet::MakeInputIterator 对L0每个文件建立一个TableCache::NewIterator对其它层整层建立LevelIterator
    2. NewCompactionMergingIterator 建立堆排序iterator
    3. CompactionIterator->c_iter->SeekToFirst 构建需要使用的iterator
    4. while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) 开始迭代输出
  8. SubcompactionState::AddToOutput 输出到文件
    1. open_file_func 没有builder的情况下新建table builder
    2. BlockBasedTableBuilder::Add 添加数据,flush的时候会创建index block
    3. CompactionJob::FinishCompactionOutputFile->BlockBasedTableBuilder::Finish 写各种index filter block完成文件

DeleteRange

  1. 参考https://rocksdb.org/blog/2018/11/21/delete-range.html
  2. 如果没有delete range,要用delete来做需要seek start, iterate and compare很慢。而且做scan的时候如果tombstone很多要一个个过没法快速跳过会很慢
  3. 基本的解决思路是写入的时候写入range tombstone,在memtable里面是range tombstone,在sst file里面有一个range deletetion block,读的时候通过在range合并之后的天际线里面二分查找快速判断是否删除,如果删除可直接返回
  4. range deletion block存储格式见https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format 末尾
  5. compaction或者flush的时候会清理过时的tombstone, tombstone到达sst最底层时可清除因为它就像个罩子来判断它下面层次的数据是否被删除
  6. DB::DeleteRange->DeleteRangeCF->DeleteImpl->MemTable::Add 会选择range_del_table_插入数据
  7. DeleteImpl->CheckMemtableFull 如果memtable满就flush这个带着tombstone的table
  8. ProcessKeyValueCompaction compaction过程迭代inputs
    1. CompactionRangeDelAggregator::AddTombstones 收集input里面的tombstones
    2. 如果是merge操作 MergeHelper::MergeUntil->ForwardRangeDelIterator::ShouldDelete 合并的时候判断key能否从range tombstone里面删除。如果当前key没有快照引用,或者版本比tombstone里面高,tombstone里面都可以删除
    3. 如果是put操作可以直接删除
    4. 没删掉的tombstone通过builder->Finish写入文件

DBImpl::IngestExternalFiles

  1. 参考https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files
  2. ReserveFileNumbersBeforeIngestion 获取下一个可用文件号
  3. ExternalSstFileIngestionJob::Prepare 将待导入文件拷贝/移动到db内部的sst文件中,导入前会判断如果多个文件是否有范围重叠
  4. WriteThread::EnterUnbatched 停写
  5. ExternalSstFileIngestionJob::NeedsFlush 判断是否要flush memtable,逻辑在ColumnFamilyData::RangesOverlapWithMemtables应该是简单的范围比较而不是遍历每个key。然后flush,DBImpl::FlushMemTable
  6. ExternalSstFileIngestionJob::Run 实际ingest逻辑
    1. CheckLevelForIngestedBehindFile 如果有ingest_behind标志,直接尝试放最后一层,如果有key重叠返回失败
    2. AssignLevelAndSeqnoForIngestedFile 从L0往下看每层首先CompactionPicker::RangeOverlapWithCompaction是否和本层正在进行的compaction output的范围冲突,Version::OverlapWithLevelIterator,IngestedFileFitInLevel(两个类似)再检查每个sst file是否有冲突
    3. AssignGlobalSeqnoForIngestedFile 为ingest_file获取global seq no
    4. edit_.AddFile 更新文件元信息到VersionEdit
  7. write_thread_.ExitUnbatched 恢复写入

Version相关

  1. 数据库lsn记录在last_sequence_,在DBImpl::WriteImpl每次写都会增加
  2. DBImpl::GetImpl读取的时候会使用seq构造LookupKey去读,seq来源GetLastPublishedSequence,LookupKey的构造类似于user_key + sequence + type
  3. 这个lookupkey有三种使用方法memtable_key(全部内容),internal_key(去掉长度描述),user_key(再去掉sequence和type只留下外部传入的key)
  4. 在memtable里面iterate的时候根据internal_key来寻找第user_key相同internal_key大于等于的key
  5. TableCache::GetFromRowCache row cache在查找的时候会用row_cache_key去查找,row_cache_key的构成是fd_number+seq_no(正常情况可能是0)+user_key。这就清楚了为什么不用update row cache了,其实还是和sst file关联的

性能优化

FileIndexer

  1. 大体思路是上一层确定了key在某个文件的范围,但是又不在这个文件中的时候可以利用这个信息加快下一层的查找。
  2. 可以做一个文件相对位置的索引,上一层的每个文件的start和end对应下一层的文件和位置

减少一次Get过程中的内存拷贝

  1. 问题是之前的版本数据从sst读取之后需要一次copy给返回给用户的变量,那明显的解法是直接返回sst读取之后内存块里面对应的地址
  2. 带来的问题是DataBlockIter本来是填充block cache之后就释放,如果返回地址给读接口,可能读接口拿到值引用时空指针
  3. 解决方案是引入了Cleanable接口,然后DataBlockIter的clean委托给PinnableSlice(Get使用的结构),等Get使用完了以后一起释放

降低Statistics代价

  1. 使用CoreLocalArray数据结构保证数据不会跨cpu访问

写wal优化

  1. 多线程写wal的时候会有竞争,所以比较好的方式是一个线程收集所有线程的写请求batch写,写完之后通知其他线程继续,可以恢复成多线程写各自的memtable
  2. 这里的线程协调有个很微妙的性能点,leveldb使用在MutexLock里面pthread_cond_wait的方式来让其他线程等待,这样会导致context switch比较重。注意虽然pthread_mutex_lock使用futex有spinlock快速返回的逻辑,但是pthread_cond_wait没有。所以这里需要自定义比较精细的wait逻辑disruptor里面其实也需要
  3. 代码实现在WriteThread::AwaitState,首先busy loop并使用asm volatile("pause")避免cpu流水线重排,如果条件不满足再进入short wait,使用yield,还不行就进入long wait,使用cond.wait()

二分查找cache miss问题

kDataBlockBinaryAndHash

  1. 参考https://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
  2. 简单说就是加个hashmap索引key信息的相对位置

阿里x-engine的解法

  1. 参考https://zhuanlan.zhihu.com/p/114681578
  2. 如果用有序数组存key,value-address,在二分查找过程中要反复跳地址,只要跳动的长度大于cache line的长度就无法使用上次缓存就会cache miss
  3. 解决办法是用两层的b+ tree,b+ tree内节点只存放key所以很紧凑,寻址过程可以充分利用缓存。两层是实践选择

InlineSkipList

  1. 参考RocksDB 源码分析 – InlineSkipList
  2. 节点的指针和key存在一起,上面n层节点指针的位置分别在数组对应的-n,-(n-1),...-1的位置,因为skiplist常用操作是从上层节点一个一个往下找,所以变成了顺序而且局部的内存访问
  3. Splice主要是记录上次insert时候每层的range,如果是顺序插入就可以利用上次的range迅速缩小范围
  4. 并发比较普通,每一层独立做CAS

并发相关

ThreadLocalPtr

  1. 参考https://zhuanlan.zhihu.com/p/398409455
  2. 每个线程一个ThreadData,里面有个vector存储多个thread local数据对象。ThreadData组成链表方便一起处理
  3. ThreadLocalPtr::StaticMeta是个singleton,所有的ThreadLocalPtr都指向它

LRU Cache

  1. LRUCache最外层实体 LRUCacheShard缓存分片 LRUHandle基本存储元素封装k,v
  2. LRUCacheShard::Lookup 读链路,加锁(本分片锁),ref+1设置hit,没有传统lru的移动位置操作。lookup之后要调用Release
  3. LRUCacheShard::Insert 写链路,参数可带优先级,加分片锁,先看是否需要释放如果需要就先从尾部删除,然后插入table,然后从顺着优先级往下看哪里有空间就插到对应的head上。
  4. LRUCacheShard::LRU_Remove 删除,变化指针以及三个优先级的容量
  5. LRUCacheShard::Release get使用结束以后需要调用release接口,引用计数-1,然后判断引用计数是否=0,=0且容量不足就删除这个节点,=0有容量会调用LRU_Insert插回去。>0就什么都不做
  6. 所以lru的逻辑就是lookup过的item(hit),在引用变0的时候如果容量够有一次机会从高优先级区开始重新插入,而优先级的逻辑就是低优先级插到队列中间偏后的位置,淘汰的比较快一些
  7. 总结来看是一个比较标准的带分片和优先级的lru实现

Clock Cache

  1. LRU Cache的问题是每次读会导致重新插入,锁竞争激烈
  2. 大致逻辑是,分片,每个分片一个环形队列,释放空间时循环扫描如果entry上次扫描之后又被置了访问标记就清理标记继续,否则就干掉entry。这应该是rocksdb某个版本的实现
  3. 这个简单算法的问题是如果整体访问很多都被置了标志就退化成fifo了,解决方案是Two-Handed Clock,有两个指针,一个负责周期性扫描清理标记,另一个负责看被清理的标记是否又被置位如果置位说明访问频繁,否则可以干掉,扫描速度也可以自调整
  4. Rocksdb的HyperClockCache基本思路也是这样,细节还需要研究

MultiGet为啥快

  1. https://github.com/facebook/rocksdb/wiki/MultiGet-Performance
  2. 少了很多虚函数调用
  3. block index和filter都放在lru里面会有锁竞争,批量访问同一个sst file的key就免了这些竞争,不过clock的方式就没有这问题?
  4. 每次访问sst会访问bloom filter会带来cache miss,batch操作可以用pipeline的方式隐藏cache miss latency,这个技巧比较高端
  5. 单次请求多个访问同一个sst file可以通过io uring的接口io并行访问,多个请求就比较难这样做

存储格式

  1.  sst文件格式 https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format
  2. key的存储做了delta压缩,每隔k个key会有一个kv不做delta压缩被称作restart point这个被用来辅助binary search
  3. kv实际存下来shared_bytes:unshared_bytes:value_length:key_delta:value

    和hbase性能比较

    1. hbase get需要寻址region,当然结果可以被缓存
    2. leveled compaction对比类似universal compaction,以及rocksdb的file indexer
    3. hbase hfile是多层index block+data block,rocksdb是block meta block + block(index part+data part) meta block一般常驻内存
    4. block mem 内查找过程 hbase看起来是线性查找,rocksdb是binary+hash辅助
    5. block cache hbase一般存索引节点,rocksdb除了block meta还可以存 data block
    6. rocksdb有row cache
    7. flush 简单ab table vs 多列族导致复杂slab分配
    8. blockcache clockwise vs dumb lru
    9. memtable 超级优化vs ConcurrentNavigableMap
    10. 参考https://bbs.huaweicloud.com/blogs/192853

    问题

    block cache什么时候更新的?

    因为是block级别的所以应该不会被更新,只会随着compaction之类的动作被重置

    row cache什么时候更新的?

    不用更新,查询的时候sst file no会在条件里面

    redis足够快了吗?

    本质说就是一个单线程hashmap get操作,expireIfNeeded->dictHashKey->dictGetVal核心链路就是这样,除了可以干掉expire,让hash算法更简单一点外,似乎没有很大程度优化空间

    speedb

    性能点

    1. 写放大系数降低80%,重写了lsm tree https://www.linkedin.com/pulse/speedb-dramatically-improves-rocksdb-response-times-performance/,这一条似乎缺乏其他资料印证
    2. 读稳定性和p99提升,p99稳定大约是rocksdb的1/4,底层算法提升,compaction动态自适应调整,根据workload调整flush和compaction
    3. speedb认为rocksb读延迟高不是因为lsm tree,而是compaction相关缺乏资源管理

    优化细节

    1. snapshot-optimization https://docs.speedb.io/speedb-features/snapshot-optimization 缓存snapshot,减少获取snapshot锁冲突
    2. Write Flow optimization https://docs.speedb.io/speedb-features/write-flow
      1. rocksdb写wal是append只能单线程,memtable switch会block,写流程中还有其他不少需要全局锁的地方
      2. spdb改变了写流程,独立一个线程处理memtable switch,flush request到memtable,wal switch等等,这个线程用两个container(memtable?不确定)切换。
      3. 多个write batch组成一个batch group,以group级别刷wal,可以多个group并发刷wal
      4. 主要的优化点是db锁变成读写锁,写wal和memtable变成并行,写wal是计算文件位置再写,可以并行。memtable和wal切换都不会有db锁不会block整个流程
      5. 新流程带来的一个复杂度代价是写memtalbe失败需要rollback wal写
      6. 该优化带来近两倍的写吞吐,但个人理解并不能降低cpu使用率
    3. Global Delayed write https://docs.speedb.io/speedb-features/global-delayed-write, 把WriteController从per db变成全局,做全局的流量分配。这个知道就好
    4. Proactive Flushing https://docs.speedb.io/speedb-features/proactive-flushing flush行为不再是被动发生在write调用的时候,判断依据不再是年龄而不顾数据多少
    5. Sorted Hash Memtable https://docs.speedb.io/speedb-features/sorted-hash-memtable 要并发写,fast read,又要能scan,默认数据结构中只有skiplist,shorted hash map的结构是一个concurrent hashmap+一个list,这个list分成无序和多个有序的vector部分,读很简单hash o(1),写同时写hashmap和无序list,后台线程整理无序list成有序vectors,整理发生在无序size超过10000或者发生scan的时候。不得不说这种根据场景打破常规做法的数据结构比较牛逼。写提升155%,随机读提升50%
    6. Paired Bloom Filter https://docs.speedb.io/speedb-features/paired-bloom-filter
      1. 原始的bloom filter局部性很差,对于一个positive key查询会有k(函数数量)次cache miss
      2. blocked bloom filter是个改进,会把内存划分成blocks,每个block大小等于cache line size(512b),对于每个key先拿一个函数映射到一个block,再拿k个函数在block内做原始bloom filter。这个是rocksdb的默认实现方式
      3. 但问题是随着key的位数增长,fpr增加得比较多,核心原因是每个block的key的数量差异很大
      4. 解决方式是通过pair来减少key的数量差异,先做block映射,然后把block分片成很多batch,一个batch 128 block,然后通过batch内通过block key的数量排序block首尾匹配成64个pair,key的前k/2个函数映射到一个block,后k/2映射到paired block,block在batch中的位置需要额外几位来存
      5. Ribbon filter比较省内存,但是耗4-6倍cpu,所以看场景
      6. 随机读比默认算法吞吐翻倍
    7. Range Delete Improvement https://docs.speedb.io/enhancements/remove-single-delete-elements-during-memtable-flush 原来的实现delete range的key还是会flush到sst,这个变更在flush过程中会过滤,一个比较细节的提升
    8. Dynamic Delayed Writes https://docs.speedb.io/enhancements/dynamic-delayed-writes 更精细的delayed write rate计算方式让变化更线性
    9. Reduce switch memtable latency https://docs.speedb.io/enhancements/reduce-switch-memtable-latency 提前准备用于switch的memtable,在切换的时候更平滑,但是要付出额外内存的代价


    参考链接