2023年10月9日星期一

ClickHouse学习笔记

一直对大数据计算很感兴趣,ClickHouse又是感觉很酷的实现,所以花了一段时间研究,整理在这里。

Debug配置

debug上面踩了不少坑。
  1. 首先是用macbook可以build但要花4,5个小时,用台式机会快很多。
  2. 在老一点版本的clion上可以用gdb+'CMAKE_BUILD_TYPE=RelWithDebInfo'。新版默认是lldb和RelWithDebInfo同时用有点问题,捕捉不到断点。
  3. 最后解决办法是在ubuntu台式机上编译,RelWithDebInfo模式,Debug模式还是有点慢而且debug过程中异常退出的概率有点大,然后clion启动remote debug连接ubuntu上面的gdbserver
步骤
  1. ubuntu上面build CMAKE_BUILD_TYPE=RelWithDebInfo
  2. 启动gdbserver gdbserver :1234 build_light/programs/clickhouse server --config-file=/home/hy/config.xml 需要config.xml打开clickhouse的远程连接
  3. clion上面启动remote debug,关键步骤是需要把ubuntu上面编译的可执行文件下载下来,然后设置成debug环境的Symbol file
  4. 在clion的gdb交互窗口输入 handle SIGUSR1 noprint nostop ,handle SIGUSR2 noprint nostop 或者加到gdbinit
  5. stl pretty print,如果有python问题,可以使用自己的gdb。clickhouse使用的是libc++而不是libstdc++,使用https://github.com/koutheir/libcxx-pretty-printers
  6. 查看stl range的方法,reinterpret_cast<uint64_t &>(key_range.left.storage)

重要流程

启动

  1. main(main.cpp)->mainEntryClickHouseServer->Poco::Util::ServerApplication:run->Server::main(Server.cpp)->Poco::Net::TCPServer.start

请求处理

  1. TCPHandler::runImpl->receivePacket->executeQuery->processInsertQuery/processOrdinaryQuery
  2. bio的模式一个线程一个连接

词法分析

  1. Lexer::nextTokenImpl 典型的一个一个token的处理会回看上一个token

语法分析

  1. IParserBase::parse 所有类型parser都会经过这一步,在expected里面记录parse的轨迹。add过程可能触发Lexer::nextTokenImpl(看index是否大于现在scan过的)
  2. ignore方法寻找并跳过某个关键字
  3. 最简单的例子'select * from test',调用链ParserQueryWithOutput::parseImpl->ParserSelectWithUnionQuery::parseImpl->ParserUnionList::parseImpl->ParserUnionQueryElement::parseImpl->ParserSelectQuery::parseImpl(这里逻辑最多)->ParserTablesInSelectQueryElement::parseImpl->ParserTableExpression::parseImpl->ParserCompoundIdentifier::parseImpl(到这里生成table的ast) 经典的递归下降,会产生ast

AST重写

  1. InterpreterSelectQuery::InterpreterSelectQuery->ApplyWithSubqueryVisitor 把with对应的subquery解析出来放map里面
  2. ApplyWithAliasVisitor 也是处理with好像只全局处理没有递归下降
  3. RewriteCountDistinctFunctionVisitor 把count(distinct)替换成count(group by)
  4. QueryAliasesVisitor 访问ast把所有的别名放在一个map里面
  5. ExecuteScalarSubqueriesVisitor 把子查询替换成常量,先查缓存,如果查不到使用interpreter getQueryInterpreter解释执行成block然后缓存。worthConvertingToLiteral 复杂类型不做替换需要的计算比较多
  6. QueryAliasesMatcher 给每个子查询设置一个单独的别名
  7. rewriteMultipleJoins 重写join 
    1. CrossToInnerJoinMatcher 把不同表之间的=表达式(外部)移到join里面 
    2. JoinToSubqueryTransformMatcher 'select * from t1 join t2 on ... join t3 on ... join t4 on ...' 改写成 'select * from (select * from t1 join t2 on ...) join t3 on ...) join t4 on ...'
  8. JoinedTables::rewriteDistributedInAndJoins
    1. InJoinSubqueriesPreprocessor 关联到distributed_product_mode配置,意义在于分布式子查询的优化,最理想的模式是被关联的表sharding的状态下在每个sharding上做local关联。参考分布式子查询。代码做的事情较简单,根据实际情况加GLOBAL,或者改库表名成remote模式这样分布式存储可以识别并本地化
    2. RenameQualifiedIdentifiersVisitor 改写子查询中的列名,去使用新的表名
  9. additional_table_filters 参考core settings 额外过滤条件
  10. joined_tables.makeTableJoin->replaceJoinedTable 谓词下推,把join table替换成(SELECT * FROM table_name) AS table_short_name

继续AST重写

  1. analyzeFunctionParamValues->FunctionParameterValuesVisitor 收集参数视图参数,replaceQueryParametersIfParametrizedView 参数替换
  2. replaceWithSubquery 在查询语句里用物化视图的sql语句替换视图名
  3. TreeRewriter.analyzeSelect 一系列处理比如别名替换,标量子查询替换成常量,多余的列删除...
  4. renameDuplicatedColumns 重复列改名
  5. TreeOptimizer::optimizeCountConstantAndSumOne 语义和count等同的聚合函数改写成count
  6. translateQualifiedNames db.table.column, table.column类型的名字变成正常的名字,*这样的列名扩充成实际列名
  7. LogicalExpressionsOptimizer 布尔表达式优化,一串or变成in
  8. TreeRewriter::normalize 用户定义的sql function apply(函数名替换成函数体),count_distinct,distinct_if等等几个函数apply,EXISTS(subquery)替换,位置参数改成列名,nullin系列表达式处理,函数名改成小写,公共子表达式删除
  9. expandGroupByAll 把group by all里面的all变成实际列
  10. removeUnneededColumnsFromSelectClause 冗余列删除
  11. executeScalarSubqueries 标量子查询替换
  12. PredicateExpressionsOptimizer.optimize where和prewhere里面的谓词下推到子查询里面,https://github.com/ClickHouse/ClickHouse/pull/2015#issuecomment-374283452
  13. TreeOptimizer::optimizeIf 完成了子查询标量替换之后先尝试直接判断标量把整个表达式替换成条件分支,optimize_if_chain_to_multiif 把三元表达式优化成multiple if
  14. TreeOptimizer::apply后面有一批优化在这个函数里面
  15. optimizeFunctionsToSubcolumns 类似于这样'length(arr)' -> 'arr.size0'
  16. optimizeAggregationFunctions 聚合函数里面的数学运算外提'sum(a * 2)' -> 'sum(a) * 2'
  17. convertQueryToCNF 转换成cnf形式,简单说就是最外层全是and这种形式在某些情况更容易使用索引
  18. optimizeWithConstraints 使用constraints关键字的信息干掉查询中多余的部分
  19. optimizeSubstituteColumn 使用constraints信息做列替换
  20. optimizeGroupBy 从groupby里面干掉单射函数和常量表达式
  21. optimizeGroupByFunctionKeys干掉groupby里面的参数不是groupby key的function(不太确定?)
  22. optimizeAnyFunctions所有操作移动到any function之外any(f(x, y, g(z))) -> f(any(x), any(y), g(any(z)))
  23. optimizeCountConstantAndSumOne 把count(常量)和sum(1)这种的简单变成count
  24. optimizeMultiIfToIf multiIf->if在参数个数合适的情况下
  25. optimizeSumIfFunctions 类似sumIf(123, cond) -> 123 * countIf(1, cond)
  26. optimizeArrayExistsFunctions 类似于arrayExists(x -> x = 1, arr) -> has(arr, 1)
  27. optimizeInjectiveFunctionsInsideUniq移除uniq里面的单射函数
  28. optimizeAggregateFunctionsOfGroupByKeys移除group by key上面的min/max等等聚合函数
  29. optimizeRedundantFunctionsInOrderBy ORDER BY x, y, f(x), g(x, y), f(h(x)), t(f(x), g(x)) -> ORDER BY x, y
  30. optimizeMonotonousFunctionsInOrderBy order by里面的单调函数替换成其参数
  31. optimizeDuplicatesInOrderBy 移除order by里面的多余元素,可能是之前的优化产生
  32. transformIfStringsIntoEnum if中的string参数转换成enum
  33. optimizeLimitBy干掉limit by中的重复元素
  34. optimizeUsing干掉using中的重复列
  35. optimizeOrLikeChain 'or xxx like '->multiMatchAny
  36. replaceAliasColumnsInQuery 查询里面的别名展开替换 `day` Date ALIAS toDate(timestamp),`day1` Date ALIAS day + 1,`day2` Date ALIAS day1 + 1 查询条件where day2 = today()展开成 ((toDate(timestamp) + 1) + 1) = today() 
  37. RewriteOrderByVisitor ORDER BY (a, b) -> ORDER BY a,b
  38. MergeTreeWhereOptimizer 计算并尝试把where里面的表达式移动进prewhere,主要是根据一些特殊表达式里面可以判断列的大小来做判断,比如"column_name = constant"
  39. generateFilterActions 根据additional_filter_ast生成filter action
  40. ExpressionAnalyzer::getRootActions 调用ActionsVisitor遍历AST表达式树,生成ExpressionActions,结构不复杂就是遍历ast生成对应的action

执行计划生成

  1. InterpreterSelectWithUnionQuery::execute->buildQueryPlan->executeImpl
  2. prepared_pipe时会增加ReadFromPreparedSource,从远程表读取
  3. executeFetchColumns 作用是返回读取columns数据的stream,可以理解为最基础的数据拉取stream,有count优化,limit优化等等
  4. preliminary_sort 添加order by, limit, distinct等等step
  5. expressions.first_stage 只用远程执行query的场景,添加step,FilterStep,before_array_join,ArrayJoinStep,before_join,converting_join_columns,join step(稍复杂),where step,aggregate step
  6. need_aggregate 有聚合的场景分片上不执行later-stage动作,在windows函数/ORDER BY之前不执行,need_aggregate=false 一些其它的执行顺序的逻辑
  7. expressions.second_stage 分布式查询场景中的聚合节点
  8. executeMergeAggregated从不同节点拉数据合并,having rollup cube等等逻辑
  9. executeWindow 添加windows函数相关的steps
  10. order by相关逻辑,有一些merge sort的逻辑
  11. limit projection offset相关的处理
  12. executeWithFill 处理with fill逻辑
  13. executeSubqueriesInSetsAndJoins 需要子查询结果构建set

pipeline构建

  1. ClickHouse 源码解析(一):SQL 的一生(上)
  2. QueryPlan::buildQueryPipeline->optimize
    1. QueryPlanOptimizations::tryRemoveRedundantSorting 尝试删除多余排序
    2. QueryPlanOptimizations::optimizeTreeFirstPass 深度遍历plan tree,挨个执行getOptimizations里面所有的优化比如 tryPushDownFilter等等
    3. QueryPlanOptimizations::optimizeTreeSecondPass 同样是遍历plan tree
    4. optimizeReadInOrder-同时使用limit和order by如果order by和表顺序一致的优化,变成FinishSorting
    5. aggregation_in_order-GROUP BY表达式至少包含排序键的前缀或注入函数。当从表中读取一个新的键时,聚合的中间结果可以被最终确定并发送给客户端,distinct_in_order distinct上面类似的优化,
    6. optimizePrimaryKeyCondition-不太明白,enableMemoryBoundMerging-enable_memory_bound_merging_of_aggregation_results相关的设置
  3. while (!stack.empty()) ... DFS方式生成pipeline,生成过程调用updatePipeline函数
  4. ISourceStep::updatePipeline(创建并初始化pipeline)->ITransformingStep::updatePipeline(往pipeline添加transformer)
  5. ReadFromMergeTree::initializePipeline->spreadMarkRanges->readInOrder->createSource 这样就有了从源数据读的processor
  6. LimitStep::transformPipeline 给pipeline添加transformer

pipeline执行

  1. ClickHouse 源码解析(二):SQL 的一生(中)
  2. ExecutingGraph::initializeExecution入口函数,将没有出边的node入栈(direct_edges),然后弹出反复调用updateNode
  3. ExecutingGraph::updateNode,做反向状态传播把output node需要数据的状态传播到input,后面是细节展开
  4. processor.prepare(node.updated_input_ports, node.updated_output_ports) 调用 Node 对应 IProcessor::prepare() 方法尝试pull数据
  5. IOutputFormat::prepare(),更新相邻边的状态和自己节点的状态,如果没有数据就设置成NeedData
  6. if (!need_expand_pipeline) 将 Node 相邻的待更新 Edge 放入 update_edges 这个栈中
  7. if (updated_processors.empty()) 简单来说,就是将Edge所指向的Node放入update_processors栈中
  8. ISource::prepare()最终会执行到这里,由于ISource没有 input,则直接返回Status::Ready
  9. case IProcessor::Status::Ready: queue.push(&node) input节点放入入口传入的queue里面,ExecutingGraph初始化完成
  10. ClickHouse 源码解析(三):SQL 的一生(下)
  11. PipelineExecutor::executeImpl 调度执行每个节点 executeStepImpl 单线程执行分支
  12. tasks.tryGetTask->context.executeTask->graph->updateNode执行完节点后更新相邻节点的状态->tasks.pushTasks新的task入全局队列
  13. executeTask->executeJob->IProcessor::work()算子处理数据、传递数据的关键方法
  14. ISource::work() tryGenerate()生成数据返回chunk
  15. spawnThreads() 多线程分支,比较简单,起多个线程threads.emplace_back(后面有没有线程池不确定) 同时从async_task_queue里面拿数据执行,执行流程和单线程类似

MergeTree读链路

  1. CLICKHOUSE 源码解析: MERGETREE READ PATH
  2. InterpreterSelectQuery::executeFetchColumns->StorageMergeTree::read->MergeTreeDataSelectExecutor::read 入口
  3. MergeTreeDataSelectExecutor::filterPartsByPartition 根据partition(比如date分区)筛选parts selectPartsToRead里面根据minmax_idx_condition过滤。之前的调用链MergeTreeDataSelectExecutor::read->readFromParts->ReadFromMergeTree::initializePipeline->getAnalysisResult->selectRangesToRead
  4. MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes 根据主键或者索引筛选出要读取的part range
    1. markRangesFromPKRange 根据 primary key筛选part下的ranges pk在内存里
    2. filterMarksUsingIndex 根据二级索引筛选ranges 可能会读磁盘
    3. process_part 可能并行选取ranges,这里为止都属于selectRangesToRead
  5. ReadFromMergeTree::spreadMarkRangesAmongStreams->ReadFromMergeTree::read->ReadFromMergeTree::readFromPool 后面是实际读取数据的过程
  6. MergeTreeReadPool::fillPerThreadInfo 将读取任务分配到多个线程,按Disk分配线程负载,最大化磁盘IO带宽利用率,将所有 Part 按物理存储磁盘排序,按序将磁盘负载分散到 N 个线程
  7. MergeTreeReadPool::getTask 并不会将线程需要完成的所有任务初始时全部分配,而是每次仅会分配一个小量级的数据读取任务。通过参数: merge_tree_min_rows_for_concurrent_read / merge_tree_min_bytes_for_concurrent_read 配置读取的数据量级。
  8. MergeTreeRangeReader::read 先读取Pre-Where相关Columns,过程中,还会对数据进行Filter,如果 Filter后无数据,便不会进行后续的Read Remaining Columns步骤。
  9. MergeTreeReaderWide::readRows 函数读取范围行数内的所有列的数据。分别对每一列数据,使用对应的反序列化方法进行读取 readRows->readData->serialization(...)反序列化
  10. SerializationString::deserializeBinaryBulk->deserializeBinarySSE2 读数据并复制到data,可以开simd优化,_mm_storeu_si128 / _mm_loadu_si128为C++编译器提供的SIMD内置函数 _mm_loadu_si128 -> SSE2 指令 MOVDQU _mm_storeu_si128 -> SSE2 指令 MOVDQU 此处为较为简单的 SIMD 应用场景: 快速内存复制 (一次性复制更多字节内存)

MergeTree写链路

  1. CLICKHOUSE 源码解析: MERGETREE WRITE-PATH
  2. InterpreterInsertQuery::buildChainImpl->SinkToStoragePtr write->MergeTreeSink::consume 入口
  3. MergeTreeSink::consume 将Block按照Partition拆成N个Block, 写入Part到Temp目录防止Dirty Read, 将TempPart挂载到表空间下, 这样便可以被客户端查询到数据, 触发一些后台执行的任务(例如执行Merge)
  4. MergeTreeDataWriter::splitBlockIntoParts通过executePartitionByExpression()计算出与partition相关的字段(可能会新增字段),通过buildScatterSelector()计算出row与partition之间的映射关系,通过IColumn::scatter()将每个Column都完成一拆多,最终组合为多个Block
  5. MergeTreeDataWriter::writeTempPart 写数据的具体实现
    1. minmax_idx->update(…) 对 Partition 涉及的字段统计 min-max 值,作为额外的谓词下推索引
    2. stableGetPermutation() 对 Block 数据按照 Order By 字段排序,会预先检查是否排序,因此如果数据已经排过序再写入,事实上写入速度会有提升,stableGetPermutation 为稳定排序, 底层实现为 std::stable_sort,时间复杂度为 O(N((logN)^2))
    3. updateTTL(…) 是为支持 TTL 数据生命周期管理做的数据 min-max 索引,用于快速判断一个 Part 是否命中 TTL
    4. MergeTreeData::createPart 创建 Part 实例 注意 choosePartType() 会按照 Block 大小和行数决定是否使用 Compact/Memory 存储方式,默认为 Wide 方式存储
    5. MergedBlockOutputStream::writeWithPermutation, 实现Block数据的写入 MergedBlockOutputStream::writeImpl->MergeTreeDataPartWriterWide::write首先由computeIndexGranularity() 计算当前Block的IndexGranularity getBlockAndPermute->column->permute会对column排序 writeColumn 将每个 Column 分别写入磁盘,calculateAndSerializePrimaryIndex、calculateAndSerializeSkipIndices 写入主键、二级索引
    6. MergeTreeDataPartWriterWide::writeColumn对Column中的每个granule分别执行 writeSingleGranule()写入Column内容到.bin flushMarkToFile()写入Column.mrk索引
    7. MergeTreeDataPartWriterWide::writeSingleGranule->SerializationString::serializeBinaryBulk(完成序列化,根据数据类型会有不同)ColumnString中的多个StringValue是连续组织的,使用Offsets来记录不同的String边界。序列化时,对每个StringValue先写入长度,再写入String内容,这里是写入到WriteBuffer中,真正落地到磁盘还会经过一次压缩

MergeTree Merge

  1. ClickHouse 源码解析: MergeTree Merge 算法
  2. 每次插入数据都会在后台尝试merge,但可能因为选不出合适的part跳过
  3. 选择part的逻辑,首先加入只选择相邻part的限制,然后遍历所有组合两层for循环组合嵌套,选择min sum_parts_size / (parts_num - 1),就是尽量选小而多的组合,比较碎片。整体上说应该是universal merge,n层几个merge出来生成n+1层
  4. merge算法是k-way merge+k堆,最常规做法。合并数据有两种做法,Horizontal 与 Vertical,Horizontal简单每次写一整行,Vertical按列写比较向量化,先写主键以及对应的part,然后拿着part信息再写其它列
  5. StorageMergeTree::scheduleDataProcessingJob->SimpleMergeSelector::select 选择需要merge的parts的具体实现
  6. MergingSortedAlgorithm::mergeImpl Merge的实现,就是循环的从待Merge的Part中(存放于queue对象中),找到最小的数据行,并将其放到输出结果集中queue的实现为SortingHeap。针对 Vertical 算法,需要额外输出 row_source,即输出数据行对应的原始 Part 编号
  7. ColumnGathererStream::gather VerticalMergeStage 阶段的主要任务就是为剩余的Gatherer Columns 包含的列进行输出

Group By原理

  1. MySQL分组查询Group By实现原理详解 mysql group by本质上说是一种排序,可以利用索引事先排好序,如果不能直接从索引得出group by就需要在临时表排序
  2. ClickHouse之聚合功能源码分析 源码分析 | ClickHouse和他的朋友们(15)Group By 为什么这么快
  3. InterpreterSelectQuery::executeImpl->executeAggregation 初始化各种配置项,构造AggregatingStep,并将其添加到query_plan中
  4. AggregatingStep::transformPipeline 对于每个上游的数据流,会构造一个AggregatingTransform的节点进行预聚合,最后pipeline resize了一下,避免了下游单线程跑可能性能差的问题(https://github.com/ClickHouse/ClickHouse/issues/35096)
  5. AggregatingTransform::work->consume 预聚合阶段,通过调用aggregator.executeOnBlock(...)函数执行预聚合
  6. AggregatingTransform::work->initGenerate 扩展pipeline,合并阶段,当预聚合阶段结束(上游通道关闭,或者聚合行数到达了设定的上限),通过扩展pipeline替换上游节点,然后等待合并数据。这里根据aggregator在预聚合过程中是否因为限制内存使用而将数据写到磁盘文件,扩展的节点是不同的
    1. 如果没有写到磁盘文件,扩展ConvertingAggregatedToChunksTransform节点
    2. 否则扩展SourceFromNativeStream,GroupingAggregatedTransform,MergingAggregatedBucketTransform,SortingAggregatedTransform
    3. GroupingAggregatedTransform 将single_level block转化为two_level block,并按照block_num进行组合,然后交给MergingAggregatedBucketTransform进行合并操作
    4. MergingAggregatedBucketTransform 进行合并操作,因为MergingAggregatedBucketTransform可以有多个,因此合并阶段也可以是并行的
    5. ConvertingAggregatedToChunksTransform 如果预聚合数据是two_level block,则扩展节点进行并行合并,然后在本节点进行sort;否则直接在本节点合并
  7. ExecutingGraph::updateNode->AggregatingTransform::expandPipeline AggregatingTransform构造了一个新的input_port,和扩展节点中的最下游节点的output_port连接起来,下次执行prepare()函数的时候,获取的input_port是新构造的那个,这里实际上等价于切换了上游数据流,切换完成。从预聚合阶段切换到合并阶段的,这部分内容也是典型的运行时扩展Pipeline的案例:需要根据计算时的数据动态的判断之后需要执行的节点类型和结构
  8. Aggregator::Aggregator 记录预聚合前的内存使用,作为是否将预聚合数据写入磁盘文件的依据,每个聚合函数有个对应的State对象,该对象作为预聚合过程中内部数据的存储点,一个sql语句中可以有多个聚合函数,ClickHouse中是将多个聚合函数的State对象分配在一整块内存上的,因此,这里需要计算每个State对象的大小和偏移量,根据键类型选择合适的哈希表。
  9. Aggregator::chooseAggregationMethod 这里根据“grouping key” 的数量、特点(lowCardinality、isNullable、isFixedString)等性质,选择合适的哈希表类型,默认选择serialized类型的哈希表,这个哈希表的键就是将多个“grouping key”拼接,针对个别类型的哈希表,构造cache
  10. Aggregator::executeOnBlock 执行预聚合的接口
    1. initDataVariantsWithSizeHint result(类型AggregatedDataVariants)是一个out型参数,实际的哈希表也是在这个对象中,这里会执行初始化操作,即根据aggregator选择的哈希表类型来初始化对应的哈希表
    2. materialize columns ClickHouse中有些列不能在聚合操作中直接使用,比如Const Column、Sparse Column等。这里对“grouping key”中这些列做了具化处理(即格式转换为普通格式
    3. prepareAggregateInstructions 这个函数内部是聚合函数的参数拼接的过程,聚合函数的参数,根据名字找到对应的列数据
    4. executeWithoutKeyImpl / executeImpl 执行具体的聚合操作
    5. convertToTwoLevel和writeToTemporaryFile 聚合操作之后,判断是否要将单层哈希表转换为双层,以及是否将数据写到磁盘文件中。
  11. executeImpl->Aggregator::executeImplBatch 遍历需要聚合的行,对每一行我们计算其哈希表中的键,如果这个键在哈希表中不存在,则通过aggregates_pool->alignedAlloc申请一个内存块,并在内存块上初始化每个聚合函数的State对象,遍历聚合函数,依次执行预聚合操作
    1. 如果聚合key是8 bit, addBatchLookupTable8 数据根据key放到4*256的空间里面,256就是uint8的空间,4可能是因为前面把数据处理成这样。然后调用add, merge这个标准流程处理数据
    2. aggregates_pool->alignedAlloc申请内存给createAggregateStates创建新的State对象,这里是可以用上jit的
    3. addBatch 使用聚合函数做聚合

函数调用向量化实现

  1. ClickHouse源码笔记3:函数调用的向量化实现
  2. SELECT a, abs(b) FROM test 对应的stream ExpressionBlockInputStream->ExpressionBlockInputStream(生成abs(b),然后删除列b替换成abs(b))->TinyLogBlockInputStream 这块的实现已经改了,看ExpressionTransform
  3. ExpressionActions::executeAction-> FunctionUnaryArithmetic.executeImpl-> UnaryOperationImpl::vector-> AbsImpl.apply 这就是一个完美的向量化优化代码,没有任何if, switch, break的分支跳转语句,for循环的长度也是已知的

select where的实现

  1. ClickHouse源码笔记4:FilterBlockInputStream, 探寻where,having的实现
  2. FilterTransform::FilterTransform 寻找filter column的位置,判断是不是常量列
  3. FilterTransform::doTransform 执行filter表达式,生成FilterDescription并通过其过滤其他的每个列
  4. ColumnVector<T>::filter 对于每个列,遍历bool数组,然后将合法数据塞进一个新的列之中,最终新的列替换旧的列,就完成了一列数据的过滤。TargetSpecific::AVX512VBMI2::doFilterAligned也可以打开avx512指令集优化

排序实现

  1. ClickHouse源码笔记6:探究列式存储系统的排序
  2. PartialSorting->MergeSorting 两个重要的pipeline
  3. Partialsortingtransform::Transform->sortBlock 对单个block排序,相当于处理一个batch
  4. getBlockSortPermutationImpl->ColumnVector<T>::getPermutation->ColumnVector<T>::permute 先通过单列排序,拿到每一列在排序之后的IColumn::Permutation perm。然后Block之中的每一列都利用这个perm, 生成一个新的排序列,替换旧的列之后,就完成Block的排序了
  5. ColumnVector<T>::getPermutation 如果有limit做std::partial_sort,如果数字类型考虑使用基数排序,否则使用默认的快排
  6. ColumnVector<T>::permute 简单的根据map(排序列和被排序列之间的位置索引)生成一个新的排序后的列
  7. MergeSortingTransform::consume 从source stream读取到memory block,如果超出限额就持久化到文件做外部排序,增加BufferingToFileTransform,MergingSortedTransform两个processor
  8. 排序算法因为数据依赖没做太多simd优化,有些技巧估计还没普及

分布DDL

  1. ClickHouse 分布式DDL 执行原理剖析
  2. InterpreterAlterQuery::executeToTable->executeDDLQueryOnCluster 判断是否是分布式ddl
  3. DDLWorker::enqueueQuery 往zk的distributed_ddl目录提交queue-000xxx记录,创建ddl task和子目录(/distributrd_ddl/queue-000xxx, /distributed_ddl/queue-000xxx/active, /distributed_ddl/queue-000xxx/finished)
  4. getDistributedDDLStatus 阻塞等待各个节点工作完成,扫描zk目录(/distributed_ddl/queue-000xxx/finished)
  5. DDLWorker::runMainThread->scheduleTasks->processTask->tryExecuteQueryOnLeaderReplica 需要leader角色的抢leader锁
  6. replicated_storage->getStatus ->tryExecuteQuery 获得副本锁,提交非分布式但可能多replica的ddl任务-副本间任务({zk_path}/log/log-00000xxx {zk_path}/mutations/0000xxx)
  7. waitForLogEntryToBeProcessedIfNecessary 等待副本间任务都完成
  8. StorageReplicatedMergeTree::queueUpdatingTask->pullLogsToQueue 同步/{zk_path}/log/log-00000xxx到自己的副本目录/{zk_path}/replicas/{replica}/queue/queue-0000yyy
  9. BackgroundJobsAssignee::threadFunc->scheduleDataProcessingJob->executeLogEntry->MergeTreeBackgroundExecutor<Queue>::threadFunction()->task->executeStep() 每个副本拉取任务执行
  10. tryFinalizeMutations 更新表示完成的zk状态

join原理

本地join

  1. Nested Loop最基础的join方式,双重循环性能差
  2. Block Nested-Loop Join 使用join buffer,每次外层循环读取多条数据放进buffer,然后在内层一次执行完这多条外层数据的比较
  3. Sort Merge Join 外层,内层的数据先根据join key排序,然后有序merge,优点是使用内存可控
  4. hash join,小表放进hash,然后大表逐条记录hash lookup,如果小表无法放进内存就需要dump到文件,快但耗内存。

分布式join

  1. shuffle join 根据join key做hash把数据拆到不同的计算节点,这样每个节点都是local join,然后合并不同节点的结果
  2. broadcast join 小表广播,把小表复制到所有大表数据节点形成local join
  3. co-located join 数据已经根绝join key分好片了,天然的local join

Clickhouse的join

  1. clickhouse的本地join先尝试hash join,内存超出就merge join
  2. ClickHouse Join为什么被大家诟病? 
  3. 分布式join是两阶段执行的模型,协调者转发请求到多个worker节点,收集worker节点的计算结果做汇总计算
  4. 普通分布式Join,协调者分发左表到n个worker节点,worker节点去其他n个节点去拿右表的全部数据,形成n*n数据传输读取,ck未能真正实现shuffle
  5. global join,为了避免每个worker重复拉取一遍右表数据,改成由协调者拉取一遍右表数据然后传输到worker节点,能避免右表的重复计算,但是有大量的重复传输
  6. Colocate JOIN,可以在生成表的时候就按join key方式分片,然后用一个分布式表join一个local表,实现的效果就是colocate join 
  7. 两阶段模型,第二阶段如果计算密集比如count distinct协调者会成为瓶颈
  8. 不支持Shuffle,如果右表是大表,没法做多节点并行优化
  9. 有些场景过滤条件下推做的不好需要人工改sql优化
  10. 没有runtime filter,runtime filter在右表很小的时候可以作为filter提前应用到左表避免很多data load和hash probe

hash join流程

  1. ClickHouse之本地Join(hash join部分)源码分析(doing)
  2. FillingRightJoinSideTransform.work->HashJoin::addJoinedBlock->joinDispatch->insertFromBlockImpl->insertFromBlockImplTypeCase 使用右表构建hashtable,数据结构看RightTableData.maps
  3. JoiningTransform::transform->HashJoin::joinBlock crossjoin直接笛卡尔积,right或者full join物化一下稀疏或者常量列,然后转到joinBlockImpl
  4. joinBlockImpl switchJoinRightColumns返回一个row_filter,这个row_filter记录了左表block中哪些行需要保留,joinRightColumns查map看是否保留,block.insert(added_columns.moveColumn(i)) 将added_columns拼接到左表的block中,block.safeGetByPosition(i).column->filter(row_filter, -1) 这里执行真正的过滤操作

两阶段分布式执行

查询

  1. ClickHouse 分布式DDL 执行原理剖析
  2. StorageDistributed::read sql重写主要是替换库名表名,ClusterProxy::executeQuery主要是修改和设置一些配置 SelectStreamFactory::createForShard 决定选择本地副本还是远程副本
  3. pool->getManyChecked 这里会去连接池中获取远端的entry,entry中包含着请求其他server的必要信息 ConnectionPoolWithFailover::getManyImpl->getMany 真正获取entry并进行排序的过程
  4. 简单说就是遍历所有shard,然后选择合适的方式连接每个shard,生成对应的stream
  5. 生成stream的逻辑在emplace_local_stream,emplace_remote_stream等等几个匿名函数,实际读取数据的逻辑也在里面
  6. ReadFromRemote::initializePipeline会生成lazily_create_stream,里面会用到ConnectionPoolWithFailover::getManyImpl

写入

  1. 同步写入是指数据直写入实际的表中,而异步写入是指数据首先被写入本地文件系统,然后发送到远端节点。异步写是主流
  2. 主要逻辑就是根据sharding_key进行拆分,然后写到不同的位置,有目录加锁逻辑

一些优化点

groupBitmap替换uniqExact

  1. 为什么groupBitmap比uniqExact快?
  2. 该优化在select countDistinct的时候使用,高基维效果明显
  3. 关键区别在merge操作,只有分布式场景需要merge。分布式场景先在数据节点执行部分聚合,然后都传到计算节点做合并然后排序。决定是否做分布式聚合的代码在 Aggregator::mergeOnBlock

基于规则的优化

  1. ClickHouse SQL 的十项优化规则 多数优化可以在前面的ast重写的prewhere部分找到

一些其他系统的sql优化

  1. SQL 子查询的优化 关联子查询join下推,算子上提
  2. 子查询相关的优化 tidb的一些子查询优化,相对简单
  3. Max/Min消除,在索引列上的时候转换成order by limit 1就只用取一行
  4. 谓词下推,下推到理数据源尽量近的地方,很好理解
  5. Join Reorder 算法简介 Join Reorder,多表join,根据表的数据量分布选择合适的join顺序

jit原理

  1. JIT in ClickHouse
  2. USE_EMBEDDED_COMPILER 启动jit编译 ExpressionActions::ExpressionActions->ActionsDAG::compileExpressions->ActionsDAG::compileFunctions->compileFunction 这里实际生成llvm的ir,在子类的compileImpl里面补全更多细节(搜关键字llvm::IRBuilderBase) 一个很容易理解的例子是AggregateFunctionCount
  3. llvm生成ir的用法可以参考链接里面的例子
  4. SELECT a + b * c + 5 -> plus(plus(a, multiply(b, c)), 5) 会编译压平成一个function,尽量simd
  5. aggregate functions 有很多虚函数,有不少wrapper,也是通过jit压平。多个列上的聚合函数也可以合并成一个大聚合函数。
  6. 对于排序,如果要比较多个列就是多个虚函数调用,这里也可以jit压平成一个函数提升性能
  7. CHJIT::compileModule->JITCompiler.compile->llvm::legacy::PassManager 代码生成,结果存储在llvm::MemoryBuffer
  8. llvm::RuntimeDyld->loadObject->resolveRelocations 动态链接器符号解析,重定位
  9. compiled_module.function_name_to_symbol.emplace 维护<function name>到生成的符号的映射
  10. module_identifier_to_memory_manager[current_module_key] 维护函数(Module)到MemoryManager的映射

bitmap的一些细节

  1. createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData> 创建bitmap
  2. SmallSet roaring::Roaring64Map roaring::Roaring 三种实现
  3. roaring_array_s 32位核心数据结构,维护container(数据桶), key(桶编号), typecodes(桶类型)等数据结构
  4. roaring_bitmap_and->container_and->bitset_bitset_container_intersection->bitset_container_and_justcard->_mm256_and_si256 and流程
  5. Roaring64Map 会重用32位的Roaring结构,RoaringBitmap64是由一系列RoaringBitmap32表示。实现方式有很多种,一种比较通用的做法用map存储,是把前32位存成key,value是后32所对应的RoaringBitmap32。前32位key的做法和32位roaring的高16位做法类似

action的一些细节

  1. action类型等等描述(ActionDAG.h),描述计算结构的一种通用结构,最有用的应该是function类型的actionType
  2. action创建是ConstInDepthNodeVisitor<ActionsMatcher, true> visit产生
  3. 通过ExpressionActions挂着的,貌似会挂到很多地方
  4. 实际执行在ExpressionActions::execute
  5. 执行点案例MergeTreeRangeReader::MergeTreeRangeReader

数据存储格式




2023年10月3日星期二

SIMD相关

原理

  • Add ALUs to increase compute capability
  • 应该就是增加计算单元提升算力,缺点可能是要占用芯片面积就没法加其他东西了,毕竟使用场景有限,所以avx512会因为体积和能耗被某些人鄙视。

字符查找

  •  Looking for an index of an element in array via SIMD. A fast way
  • 16个8位字符打包成128位__m128i ARR = _mm_setr_epi8(1,2,3...)
  • 设置被比较数字的vector__m128i N = _mm_set1_epi8(3)
  • 比较并得到按bit的mask,__m128i cmp = _mm_cmpeq_epi8(ARR, N); int mask = _mm_movemask_epi8(cmp);
  • _tzcnt_u32返回尾部最小有效零位的个数,也就是第一个满足条件的字符的位置

基数排序

  • Bitwise MSB Radix Sort on AVX-512
  • 基于bit的MSB基数排序,从高位开始每bit做比较像快排一样做左右划分,这样不需要额外的空间去做基数归类
  • simd排序有个麻烦的点是怎样搬动比较key之外的数据,这里的做法是看成kv并打包在一起,如果v是64位指针kv就是128位,512位的vector可以放4个,如果v是32位的位置kv是64位放8个
  • 每次拿vector中所有key中的一位进行比较得到一个mask, testAndCount(bitMaskVec, keyPayload, sortBits, popcnt)
  • 然后关键点是mask_compressstoreu这个指令,通过sortBits,把keyPayload符合条件的位对应的数据写到d + writePos[0],这样就实现了搬动数据
  • 但是感觉一位一位排序似乎效率有点低,把simd的优势又还回去了?

排序网络(Bitonic Sorting Network)

  • Bitonic_sorter 
  • 双调排序,适合并行度比较大的场景,双调序列是先单调递增然后单调递减的序列,将双调序列等分然后两个子序列一一比较得到min和max序列又都是双调序列。排序算法是先从原子一级级组成一整个双调序列,然后再一级级拆散最后就是排序好的
  • 复杂度是n*logn^2,但并行度很好
  • simd算法详见A Novel Hybrid Quicksort Algorithm Vectorized using AVX-512 on Intel Skylake

快速排序

  • Fast Quicksort Implementation Using AVX Instructions
  • 假设一次操作4个数,VBROADCASTSS (pivot), P (设置pivot)
  • VMOVDQU (in), D (取数组成向量)
  • VPCMPGTD D, P, C (向量和pivot比较)
  • VMOVMSKPS C, r (生成按bit的mask)
  • SHL $4, r VPERMILPS permTableLesser(, r), D, L VPERMILPS permTableGreater(, r), D, G (用mask查表得到两个向量内位置list,然后用vpermilps指令从源向量取数到目标寄存器,完成了partition这一步)
  • permTableLesser这种表是预先构建好的,因为mask寄存器只有16种可能对应的位置都是确定的
  • POPCNT r, r0 MOV $4, r1 SUB r0, r1 (计算partition后两边各有几个数)
  • VMOVDQU L, (Lptr) VMOVDQU G, (Gptr) (寄存器写回内存比如数组里面)
  • LEA (Lptr, r0, 4), Lptr LEA (Gptr, r1, 4), Gptr (移动数组指针)
  • 256位的AVX2增加了VPGATHERQQ指令,这样源向量里面可以存kv然后VPGATHERQQ只取出key来比较
  • AVX512增加了VCOMPRESSPS指令,直接通过mask写无需查表
  • 存kv始终是个痛点,放一起会减少向量里面的记录数,A Novel Hybrid Quicksort Algorithm Vectorized using AVX-512 on Intel Skylake 给的一个解决办法是value单独放一个等长的数组,根据mask partition的时候也给value数组操作一次