2017年10月18日星期三

其它技术备忘

G1 gc

1.garbage first是指内存分很多小区域,每个区域可以标记垃圾比例,回收的时候找比例高的回收
2.为啥能控制停顿时间?能自动调整参数是一个原因,还有一个原因是能控制回收分区的数量
3.为啥能解决碎片?同样是因为分区,回收的时候是把一些分区挪移到其它的分区,同时做了压缩。核心创新还是分区

java性能权威指南的几个点

1.TLAB线程本地对象分配区适当调整可能会优化对象分配速度
2.linux huge使用加快page寻址,避免page换出
3.默认开启指针压缩但最大只能支持到32g,一般内存不要超过这个值
4.java8 parallelStream 会做自动并行化,使用恰当可以优化性能
5.@Contended可以对齐cache line,而不用自己去推算cpu cache line大小

关于mmap的性能

1.mmap快在不用每次读写都拷贝内存和系统调用
2.写的时候尤其是顺序写,如果每次写入的大小比较大,内存拷贝和系统调用的优势在实际io面前就没那么大了,所以mmap和fwrite的性能实际差距不太大
3.一般的建议是append的方式写文件用fwirte,因为mmap resize需要反复撤销和创建mmap这个成本较高没必要。不过如果一开始就知道文件大小而且很大的情况下用mmap也可以。如果读比较多就比较适合用mmap。比较经典的例子就是kafka和metaq,kafka只有索引用了mmap而metaq是索引和log文件都用了mmap。可以看看linus大神怎么说https://stackoverflow.com/questions/35891525/mmap-for-writing-sequential-log-file-for-speed

io_uring

1.简单描述就是有submission queue和completion queue两个队列是用户态和内核态共享,所以用户态提交io请求由内核线程负责处理sq并返回cq,整个过程没有系统调用


2017年10月9日星期一

kafka精确一次投递和事务消息学习整理

概述

今年6月发布的kafka 0.11.0.0包含两个比较大的特性,exactly once delivery和transactional transactional messaging。之前一直对事务这块比较感兴趣,所以抽空详细学习了一下,感觉收获还是挺多的。

对这两个特性的详细描述可以看这三篇文档,
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

需求场景


精确一次投递

消息重复一直是消息领域的一个痛点,而消息重复可能发生于下面这些场景
1.消息发送端发出消息,服务端落盘以后因为网络等种种原因发送端得到一个发送失败的响应,然后发送端重发消息导致消息重复。  
2.消息消费端在消费过程中挂掉另一个消费端启动拿之前记录的位点开始消费,由于位点的滞后性可能会导致新启动的客户端有少量重复消费。

先说说问题2,一般的解决方案是让下游做幂等或者尽量每消费一条消息都记位点,对于少数严格的场景可能需要把位点和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费位点,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新。

问题1的解决方案也就是kafka实现的方案是每个producer有一个producer id,服务端会通过这个id关联记录每个producer的状态,每个producer的每条消息会带上一个递增的sequence,服务端会记录每个producer对应的当前最大sequence,如果新的消息带上的sequence不大于当前的最大sequence就拒绝这条消息,问题1的场景如果消息落盘会同时更新最大sequence,这个时候重发的消息会被服务端拒掉从而避免消息重复。后面展开详细说一下这个解决方案。

事务消息

1.最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见
2.producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务
3.kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交
4.producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务
5.流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别

一个比较典型的consume-transform-produce的场景像下面这样
public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
    KafkaProducer producer = new KafkaProducer<>(producerConfig);

    producer.initTransactions();
     
    while(true) {
      ConsumerRecords records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        producer.beginTransaction();
        List> outputRecords = processRecords(records);
        for (ProducerRecord outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        producer.endTransaction();
      }
    }
  }
}

几个关键概念和推导

1.因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。
2.事务管理中事务日志是必不可少的,kafka使用一个内部topic来保存事务日志,这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。
3.因为事务存在commit和abort两种操作,而客户端又有read committed和read uncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。
4.producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联,这个就是TransactionalId,一个producer挂了,另一个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。注意不要把TransactionalId和数据库事务中常见的transaction id搞混了,kafka目前没有引入全局序,所以也没有transaction id,这个TransactionalId是用户提前配置的。
5. TransactionalId能关联producer,也需要避免两个使用相同TransactionalId的producer同时存在,所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer epoch

重要的类图




架构和数据流

事务消息

官方文档的数据流组件图

上图中每个方框代表一台独立的机器,图下方比较长的圆角矩形代表kafka topic,图中间的两个角是圆的的方框代表broker里面的逻辑组件,箭头代表rpc调用。

接下来说一下事务的数据流,这里基本按照官方文档的结构加上我自己看代码的一点补充

1.首先producer需要找到transaction coordinator。TransactionManager.lookupCoordinator
private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
    switch (type) {
        case GROUP:
            consumerGroupCoordinator = null;
            break;
        case TRANSACTION:
            transactionCoordinator = null;
            break;
        default:
            throw new IllegalStateException("Invalid coordinator type: " + type);
    }

    FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
    enqueueRequest(new FindCoordinatorHandler(builder));
}

2.获取producer id,producer id是比较重要的概念,精确一次投递需要producer id+sequence防止重复投递,事务消息也需要保存transactional id和producer id的对应关系。客户端调用KafkaProducer.initTransactions的时候会向coordinator请求producer id,TransactionManager.initializeTransactions
public synchronized TransactionalRequestResult initializeTransactions() {
    ensureTransactional();
    transitionTo(State.INITIALIZING);
    setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
    this.sequenceNumbers.clear();
    InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
    InitProducerIdHandler handler = new InitProducerIdHandler(builder);
    enqueueRequest(handler);
    return handler.result;
}

coordinator端处理逻辑在TransactionCoordinator.handleInitProducerId流程比较复杂,首先如果对应的transactional id没有产生过producer id会找producerIdManager生成一个
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).right.flatMap {
  case None =>
    val producerId = producerIdManager.generateProducerId()
    val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
      producerId = producerId,
      producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
      txnTimeoutMs = transactionTimeoutMs,
      state = Empty,
      topicPartitions = collection.mutable.Set.empty[TopicPartition],
      txnLastUpdateTimestamp = time.milliseconds())
    txnManager.putTransactionStateIfNotExists(transactionalId, createdMetadata)

  case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
}

producer id需要全局唯一,有点类似于tddl sequence的生成逻辑,ProducerIdManager.generateProducerId会一次申请一批id然后在zk上面保存状态,本地每次生成+1,如果超出了当前批次的范围就去找zk重新申请

拿到了producer id接下来处理事务状态,保证之前的事务状态能够处理完毕,该提交的提交,该回滚的回滚。
TransactionCoordinator.prepareInitProduceIdTransit处理producer id的变化比如开始一个新的事务可能会增加producer epoch,也可能生成新的producer id
case PrepareAbort | PrepareCommit =>
  // reply to client and let it backoff and retry
  Left(Errors.CONCURRENT_TRANSACTIONS)

case CompleteAbort | CompleteCommit | Empty =>
  val transitMetadata = if (txnMetadata.isProducerEpochExhausted) {
    val newProducerId = producerIdManager.generateProducerId()
    txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds())
  } else {
    txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds())
  }

  Right(coordinatorEpoch, transitMetadata)

case Ongoing =>
  // indicate to abort the current ongoing txn first. Note that this epoch is never returned to the
  // user. We will abort the ongoing transaction and return CONCURRENT_TRANSACTIONS to the client.
  // This forces the client to retry, which will ensure that the epoch is bumped a second time. In
  // particular, if fencing the current producer exhausts the available epochs for the current producerId,
  // then when the client retries, we will generate a new producerId.
  Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())

最后如果之前的事务处于进行中的状态会回滚事务
handleEndTransaction(transactionalId,
  newMetadata.producerId,
  newMetadata.producerEpoch,
  TransactionResult.ABORT,
  sendRetriableErrorCallback)

或者就是新事务,往事务日志里面插一条日志(对应数据流图中的2a)
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback)

3.客户端调用KafkaProducer.beginTransaction开始新事务。这一步相对简单,就是客户端设置状态成State.IN_TRANSACTION

4.consume-transform-produce过程,这一步是实际消费消息和生产消息的过程。
(1)客户端发送消息时(KafkaProducer.send),对于新碰到的TopicPartition会触发AddPartitionsToTxnRequest。服务端对应的处理在TransactionCoordinator.handleAddPartitionsToTransaction,主要做的事情是更新事务元数据和记录事务日志(对应数据流图中的4.1a)。在事务中记录partition的作用是后面给事务每个partition发送提交或者回滚标记时需要事务所有的partition。
(2)客户端通过KafkaProducer.send发送消息(ProduceRequest),比较早的kafka版本增加了PID,epoch,sequence number等几个字段,对应数据流图中的4.2a
(3)客户端调用KafkaProducer.sendOffsetsToTransaction保存事务消费位点。服务端的处理逻辑在TransactionCoordinator.handleAddPartitionsToTransaction,和4.1基本是一样的,不同的是4.3记录的是记录消费位点的topic(GROUP_METADATA_TOPIC_NAME)。
(4)4.3调用的后半部分会触发TxnOffsetCommitRequest,通过数据消息的方式把消费位点持久化到GROUP_METADATA_TOPIC_NAME(__consumer-offsets)这个topic里面去,对应数据流图中的4.4a。
客户端发起逻辑在AddOffsetsToTxnHandler.handleResponse
if (error == Errors.NONE) {
    log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
            builder.consumerGroupId());

    // note the result is not completed until the TxnOffsetCommit returns
    pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
    transactionStarted = true;
}
因为需要处理可见性相关的逻辑,服务端事务消费位点和普通消费位点提交的处理逻辑稍有不同,调用GroupCoordinator.handleTxnCommitOffsets而不是handleCommitOffsets。

5.结束事务需要调用KafkaProducer.commitTransaction或者KafkaProducer.abortTransaction
(1)首先客户端会发送一个EndTxnRequest,而服务端由TransactionCoordinator.handleEndTransaction处理。

handleEndTransaction首先会做一个可能的状态转换让事务进入预提交或者预放弃阶段
else txnMetadata.state match {
  case Ongoing =>
    val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
      PrepareCommit
    else
      PrepareAbort
    Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))

接下来会在事务日志里面记录PREPARE_COMMIT或者PREPARE_ABORT日志,对应数据流图中的5.1a
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)

再接下来会往用户数据日志里面发送COMMIT或者ABORT的Control Message,最后往事务日志里面写入COMMIT或者ABORT,才算完成了事务的提交过程。这个过程是用回调的方式组织起来的,代码的流程是TransactionStateManager.appendTransactionToLog->TransactionMarkerChannelManager.addTxnMarkersToSend->TransactionStateManager.appendTransactionToLog

(2)往用户数据日志里面发送COMMIT或者ABORT的Control Message的过程,对应数据流图中的5.2a
发起点在回调方法sendTxnMarkersCallback,这个方法首先会做转台转换让事务进入CompleteCommit或者CompleteAbort状态
case PrepareCommit =>
  if (txnMarkerResult != TransactionResult.COMMIT)
    logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
  else
    Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
case PrepareAbort =>
  if (txnMarkerResult != TransactionResult.ABORT)
    logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
  else
    Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))

最后会往事务相关的每个broker发送WriteTxnMarkersRequest,如果事务包含消费位点也会往__consumer-offsets所在的broker发请求。 broker端的处理在KafkaApis.handleWriteTxnMarkersRequest会把control message写入日志
replicaManager.appendRecords(
  timeout = config.requestTimeoutMs.toLong,
  requiredAcks = -1,
  internalTopicsAllowed = true,
  isFromClient = false,
  entriesPerPartition = controlRecords,
  responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))

(3)事务日志写入最终的COMMIT或者ABORT日志,对应数据流图的5.3,这一步完成了一个事务就算彻底完成了。
发起点在回调方法appendToLogCallback

精确一次投递



大致列一下流程中的关键节点

1.在客户端每次发送消息之前会检查是否有producerId如果没有会找服务端去申请,Sender.run
if (transactionManager != null) {
    if (!transactionManager.isTransactional()) {
        // this is an idempotent producer, so make sure we have a producer id
        maybeWaitForProducerId();
    } 
服务端会在TransactionCoordinator.handleInitProducerId处理,前面事务消息提到过

2.在生成消息内容的时候(RecordAccumulator.drain)会获取当前的的sequenceNumber(TransactionManager.sequenceNumber)放到消息体里面。而sequenceNumber的自增是在发送上一批消息返回是触发的(Sender.handleProduceResponse)。

3.broker实际写入消息之前(Log.append)才会对sequenceNumber进行校验,校验的具体逻辑在ProducerStateManager.validateAppend

其他一些技术点

1.sequenceNumber可以设计成每个producer唯一或者更细粒度的对每个topic-partition唯一,topic-partition唯一的好处是对于每个topic-partition sequenceNumber可以设计成连续的,这样broker端可以做更强的校验,比如检查丢消息,kafka使用的就是细粒度的方法,发现sequenceNumber不连续的时候会抛异常OutOfOrderSequenceException

2.发消息(KafkaProducer.doSend)是个异步的过程,但同时提供Future返回值使得在必要的时候可以把异步变成同步等待。kafka也实现了攒消息批量发送的能力(RecordAccumulator.append),攒消息的存放方式是一个大hash map,key是topic-partition,消息实际刷出和发送在一个单独的线程中执行,调用Sender.sendProducerData。被刷出的消息的判定在RecordAccumulator.ready,主要依据是消息集是否已满或者是否超时。

3.Consumer消费的时候怎样控制事务可见性呢?一个比较直观的方法就是先把事务消息buffer起来,然后遇到提交或者回滚标志的时候做相应的处理,kafka处理的更巧妙一些。首先是不能读到未提交的事务的控制,kafka引入了lastStableOffset这个概念,lastStableOffset是当前已经提交的事务的最大位点。在ReplicaManager.readFromLocalLog里面有控制,
val initialHighWatermark = localReplica.highWatermark.messageOffset
val lastStableOffset = if (isolationLevel == IsolationLevel.READ_COMMITTED)
  Some(localReplica.lastStableOffset.messageOffset)
else
  None

// decide whether to only fetch committed data (i.e. messages below high watermark)
val maxOffsetOpt = if (readOnlyCommitted)
  Some(lastStableOffset.getOrElse(initialHighWatermark))
else
  None
...
val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)

这样未提交的事务对客户端就不可见了

4.还有一个需求是要识别并且跳过那些在aborted事务内的消息,这些消息可能和非事务消息混在一起。kafka读消息的返回信息中会带上本批读取的消息中回滚事务列表来帮助客户端跳过。
case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
                         records: Records,
                         firstEntryIncomplete: Boolean = false,
                         abortedTransactions: Option[List[AbortedTransaction]] = None)

回滚事务列表是在读取消息日志(Log.read)的过程中撸的
return isolationLevel match {
  case IsolationLevel.READ_UNCOMMITTED => fetchInfo
  case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
}
那接下来的问题是kafka是如何快速拿到回滚事务列表的呢?kafka为这件事做了一个文件索引,文件后缀名是'.txnindex',相关的管理逻辑在TransactionIndex。

5.broker端对于producer的状态管理,broker需要记录producer对应的最大sequenceNumber,epoch之类的信息。相关逻辑是放在ProducerStateManager里面的,broker每次写入消息的时候(Log.append)都会更新producer信息(ProducerStateManager.update)。由于只有当有消息写入的时候producer state才会被更新,所以当broker挂掉的时候producer的状态需要被持久化,kafka又弄了一个文件'.snapshot'来持久化producer信息。

设计上的体会

感觉kafka在设计上概念的统一和架构的连贯上做的特别好,比如producerId的引入把精确一次投递和事务消息都给串联起来了。

印象更深刻的例子是kafka早先几个版本依次推出了几个特性,
1.把位点当做普通消息保存
2.加入了消息清理机制,只保留key最新的value
3.加入了broker端的coordinator解决惊群和脑裂问题

然后这几个特性在事务消息这块都用上了,首先把位点当普通消息保存在概念上统一了消息发送和消费,同时消息同步也成了broker之间同步状态的基础机制这样就不用再弄一套状态同步机制了,不过这样做的缺点是只有写消息才能同步broker状态某些特殊情况可能有点小麻烦。利用消息队列保存状态的一个毛病是比较浪费资源,而消息清理机制恰好解决了这个问题。最后是broker端的coordinator机制可以用在consumer group协调者也可以用在事务协调者上面。这种层次递进的特性累加真是相当有美感而且感觉是深思熟虑的结果。

参考资料

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
https://medium.com/@jaykreps/exactly-once-support-in-apache-kafka-55e1fdd0a35f




2017年1月14日星期六

nodejs源码探险

动机

之所以称之为探险是因为我是一名java程序员,对c和c++的了解仅限于大学时学的一点点知识,而javascript虽然用过一些,但深入程度有限,nodejs的源码组合了c,c++,js对我来说实在难度不小,怎奈我对特别酷的技术没什么抵抗力,所以还是入了坑。而学习nodejs代码的过程也是充满了挫折,断断续续持续了一年的时间而且中途差点就走不下去而放弃了,现在总算有点弄明白了,所以写下这篇文章算是付出的这些艰辛的一点结晶吧。

确立目标

学习一个框架的源代码之前总得对这个框架有所了解,所以先找了本node.js in action大概的翻了一遍,从这本书得到的对node的印象大概是1.node和前端的js没什么区别,但是有个require是前端js没有的 2.node可以写server端程序,可以操作文件,操作网络 3.node是单线程的 4.node的风格是异步,io风格也是异步回调 5.node开发web应用确实很简单方便,而且框架不少。 之后又在网上零碎的了解了一些node知识1.node有三大块--nodejs本身,libuv,v8引擎 2.node的tick, setTimeout行为比较有趣

到这里我学习nodejs的目标就大致确定了
1.学习nodejs本身,看看怎么整合js,v8和libuv
2.学习libuv,了解一下linux下面的网络,io编程,同时也了解一下epoll因为用java nio的时候只能了解到nio api这层就止步了,比较想知道下面发生的事情
3.学习v8引擎,看看这牛逼的引擎为啥这么快,gc和即时编译这些都是怎么做的(事后证明这个目标太贪心了,也导致差点玩下去了)

在上面的目标下要重点了解的技术点也可以列出来
1.nodejs的require是怎么实现的
2.nodejs的js和libuv的c是怎么交互的
3.nextTick和setTimeout这些是怎么实现的
4.libuv的异步io是怎么实现的
5.nodejs里面的domain能捕获相关的异常看着很酷,怎么实现的(这个点还是读源代码过程中发现的)
6.epoll为啥那么牛逼
7.了解c/c++的编程风格

准备工作

玩源代码首先要能debug,所以一顿google把准备工作做足了

调试js端代码

这个相对简单一点,
1.npm install -g node-inspector
2.node --debug-brk /home/huying/development/nodejs/huying-code/basicHttp.js
3.启动node-inspector

调试c和c++代码

这一块就没有官方攻略了,只能自己diy,虽然传说中vim或者emacs+gdb很牛逼,但是就我自己emacs+jdb的体验来文件编辑器做debug操作起来并不舒服,像树形展开这类操作没有鼠标点来方便,所以还是选了比较全能的eclipse

后面的步骤就直接列了
1.去官网下载node源代码,我下的是node-v4.4.7
2.打开源码的debug开关 ./configure --prefix /home/huying/development/nodejs/node-debug-version --gdb 或者直接 ./configure --debug
3.修改Makefile,打开BUILDTYPE=Debug
4.make
5.eclipse首先在源码上创建c/c++项目,然后用node_g创建一个运行项就可以debug了(gdb deubg可以直接gdb --args ./node_g myscript.js)
6.碰到glibc这样看不到源码的,可以先apt-get install源码,然后在source这里用创建目录映射的方式去关联源码

查看v8变量

这里我踩了一个大坑,每次debug进入v8的时候变量被v8包装一下就没法看了,在eclipse里面v8的变量都是Local<String>这样的东东,无论怎么展开都没法看到里面的String长啥样。。。
志在了解v8引擎的我自然不甘心,各种google之下发现可以给gdb开pretty print,可是stl的pretty print好找,v8的pretty print脚本真不多而且看起来比较靠谱的还是用scheme写的。。。这里折腾了不少时间都没看到效果,而且平时只能拿出零碎的时间玩node所以感觉搞不下去了。。。

后来不得已调整了目标,放弃了了解v8,其实了解v8这个目标订的太大即使debug好使,可能陷入其中也会耽误了解node js这个基本目标。但是完全看不到v8里面的变量内容对理解nodejs的某些流程还是会有影响的,后来我找到了一些变通的解决办法,比如c端和js端一起debug,还有就是可以修改源代码,加上这个函数
    static const char* ToCString(Local value) {
     v8::String::Utf8Value string(value);
     char *str = (char *) malloc(string.length() + 1);
     strcpy(str, *string);
     return str;
    }

然后就能打印出v8变量的内容了,虽然使用起来还是不怎么方便


捏个软柿子先

看源代码首先得找个切入点,我一般是从启动流程看起,nodejs的入口在c这段,感觉有点难咧,所以就先看看js这端的启动逻辑吧

1.启动执行的是node.js文件的startup()方法,找到这个入口当然是google来的但是后面看c++部分的时候得到了印证,在node.cc->LoadEnvironment 函数的末尾
 
  Local script_name = FIXED_ONE_BYTE_STRING(env->isolate(), "node.js");
  Local f_value = ExecuteString(env, MainSource(env), script_name);

  Local arg = env->process_object();
  f->Call(global, 1, &arg);

2.上来就是一句var EventEmitter = NativeModule.require('events'),先了解一下NativeModule.require

NativeModule.require->NativeModule.prototype.compile->NativeModule._source = process.binding('natives')->NativeModule.wrap(给source加上一个function(exports, require ...) 这样一个头和尾)->ContextifyScript.runInThisContext(这对应一个c++类在node_contextify.cc)->Local<Script> script = unbound_script->BindToCurrentContext()->script->Run()(这里返回的是一个function,就是wrap加了function头尾的那个)->fn(this.exports, NativeModule.require, this, this.filename)->return nativeModule.exports

这里->只是我思考的顺序不完全代表调用的过程,可以看得出NativeModule.require这个过程还是挺复杂的。require首先通过process.binding从c++那一端拿到module的源代码,然后用wrap方法给加了个头尾,注意这里是个很有趣的元编程技巧,我一直想不通那个长得像关键字的require()函数是从哪里来的,其实就是这个wrap给传进来的,传的值是NativeModule.require
wrap以后的js代码又被传回c++那边给v8引擎执行,执行完以后回到js端返回exports属性

3.接下来一句EventEmitter.call(process)也让我头晕,冷静一点以后发现process其实就是当this给传进去了,然后EventEmitter初始化方法给process对象设置了各种属性

4.接下来各种startup.processXXX都比较好理解,就是设置了各种数据结构,然后是debug相关参数的处理

5.接下来比较重要的一句是startup.preloadModules(),这个是由--require参数触发的预先加载

NativeModule.require('module')._preloadModules(这里比较重要的module.js就登场了)->Module.prototype.require->Module._load->Module._resolveFilename->NativeModule.nonInternalExists->Module.prototype.load->this.paths = Module._nodeModulePaths(path.dirname(filename))(设置查找路径)->Module._extensions[extension](this, filename)(这句看着不起眼,却很重要,用的functional的技巧)->module._compile(internalModule.stripBOM(content), filename)->Module.wrap->runInThisContext(这两个都和NativeModule里面调的一样)->const require = internalModule.makeRequireFunction.call(this)(这句比较诡异,结果还是拿到了module.require函数本身,但是给函数对象加了几个属性)->compiledWrapper.apply(this.exports, args)(和之前NativeModule相同的trick)

整体逻辑就是把参数--require里面的module每个都调一下Module.require,发生的事情也和NativeModule里面的类似。Module和NativeModule的核心逻辑类似,感觉Module的功能更广一些

6.终于走到跑main脚本了,Module.runMain()->Module._load(process.argv[1], null, true) 其实就是load一下目标脚本,后面还跟了一句process._tickCallback,这个是给无孔不入的process.nextTick用的,后面再详细说明

js端启动流程就看得差不多了,这个过程的收获是大致了解了nodejs js端的启动流程,同时也了解了长得像关键字的reqiure的实现,对其中的元编程技巧也是印象深刻。这一段学习过程的方法也比较简单,顺着链路读代码就ok了,对于c++端发生的事情用用grep大致就能分析出来了


初看c++端启动流程

到这里就进入噩梦模式了,刚开始看c++代码的时候最大的问题是把握不好抽象级别,一不小心就扎入一个细节出不来,或者忽略一个关键的细节。第一遍过启动流程的感觉是云里雾里,不过也抓住了几个比较重要的点

1.main(node_main.cc) 启动入口

2.PlatformInit(node.cc)  信号量和文件描述符相关的初始化

3.Init(node.cc) 这里有一个比较重要的调用 uv_default_loop->uv_loop_init(初始化lib_uv的核心数据结构uv_loop_t)。跟踪进去以后看到的是各种QUEUE_INIT(&loop->wq),uv_async_init,uv_signal_init,马上就迷失了,关键问题是不知道这各种宏,各种数据结构都有什么用,所以这里先跳过,等后面经验值够了再回来

4.接下来几句v8::platform::CreateDefaultPlatform,V8::InitializePlatform(default_platform),V8::Initialize()很明显是初始化v8引擎了,查查文档就可以确定是v8的标准用法

5.StartNodeInstance(node.cc) 这里有好多v8相关的东东,所以先得稍微了解一下才能进行下去
只要google一下Isolate,HandleScope 这些关键字就能找到不少讲解这些概念的文章,在这里稍做总结

(1)Isolate代表一个v8 engine实例 ,Context代表一次js执行的上下文
(2)Handle是v8环境的对象句柄,因为指针可能被gc移动,所以必须使用句柄,HandleScope是Handle的一个栈,javascript类型在c++里面都有对应的类型像String,Integer等,c++通过Handle使用这些类型,通过handle可以使用gc来管理
(3)HandleScope管理Handle的生命周期,HandleScope只能分配在栈上, HandleScope对象声明后, 其后建立的Handle都由HandleScope来管理生命周期,HandleScope对象析构后,其管理的Handle将由GC判断是否回收,对于那种需要return的handle,要用HandleScope::Close转交给上一级HandleScope管理
(4)context_scope,handle_scope都是v8的函数,context_scope意味着进入这个context的范围,后面新建的handle都在这个context下面,直到这个context析构
(5)v8::External把C++的对象包装成Javascript中的变量。External::New接受一个C++对象的指针作为初始化参数,然后返回一个包含这个指针的Handle<External>对象供v8引擎使用
(6)v8::Object这种代表javascript里面的对象

6.CreateEnvironment 首先是诡异的set_as_external,set_binding_cache_object,这些都是宏生成的,暂时理解不了先跳过。然后是uv_check_init(),env->idle_check_handle()这些东东,从注释上看是和profile相关的。另外稍微穿越一下,uv_check_init()初始化的handle是在uv_run->uv__run_check 这里执行的

7.SetupProcessObject 刚读到这里由于对v8的使用方式完全不了解所以还是很晕的,但基本能建立的一个关联是这里在设置process对象,这个process应该和js端经常用到的是一个。比如拿env->SetMethod(process, "_setupNextTick", SetupNextTick)在js里面grep一把立马能看到const tickInfo = process._setupNextTick(_tickCallback, _runMicrotasks)这样的使用,这样就把关联建立起来了

8.LoadEnvironment 这里很让人兴奋,上来就是
  Local script_name = FIXED_ONE_BYTE_STRING(env->isolate(), "node.js");
  Local f_value = ExecuteString(env, MainSource(env), script_name);
这不就是在eval node.js了么?不过还有个小机关,MainSource方法里面引用了一个变量node_native,查看一下它的定义在node_natives.h const unsigned char node_native[] = { 47,47,32,72 ...}
这里js2c.py工具会把src/node.js和lib/*.js转换成字节数组生成node_natives.h
node.js eval出来只是一个函数,所以后面有
  Local arg = env->process_object();
  f->Call(global, 1, &arg);
算是真正启动node.js了

9.v8::platform::PumpMessageLoop(default_platform, isolate) 这个调用一般是执行了自己的script之后,因为v8偶尔会放一些task到前台线程执行所以如果使用default v8::Platform,用户需要自己调用PumpMessageLoop让这些task有机会执行,v8的人建议创建自己的v8::Platform

10.uv_run 启动lib_uv,有一个大循环,在循环里运行uv__run_timers,uv__run_pending,uv__run_idle,uv__run_prepare,uv__io_poll等等,读到这里的时候感觉和java nio的reactor模式比较像但是后面看细节还是有很多不同的,这里就会一直跑大循环直到没有需要监听的时间才退出了。事件循环后面是退出逻辑,做一些退出的回调和资源清理


跟踪一个简单的流程

启动流程有很多没看懂的地方,想继续深入就得有交互,看看nodejs是怎么工作的的了。一开始最好不要弄太复杂的流程,所以我选择跟踪一个写文件的流程

 
var fs = require('fs');
 
fs.writeFile("/home/huying/test.txt","my fs!",function(e){
    console.log('write finished...');
})
//fs.writeFileSync("/home/huying/test.txt","my fs!");
console.log('function finished...');



1.想debug得有一个断点,上面这块代码对应的c++端我根本不知道怎么加断点,所以只能往底层分析js代码。翻了翻fs.js,fs.writeFileSync会调用binding.writeBuffer,然后看到binding的定义binding = process.binding('fs'),看到process眼前一亮,关联建立起来了

2.grep binding我们能找到env->SetMethod(process, "binding", Binding) Binding方法有一句
node_module* mod = get_builtin_module(*module_v)就在这里加个断点,从名字看这一句是在读取builtin的模块,所以就不断f8看这里都bind了哪些module,很快就发现有个node_file.cc的module看起来靠谱。

Binding方法还有一句 mod->nm_context_register_func 内容是node::InitFs,看来module初始化的方法就是它了

3.到这里有两个方向可以探索了,一个是可以继续看node写文件的过程,一个是解答之前的一个疑惑--js和c++是怎样交互的,我选择先了解后者。

InitFs里面有不少env->SetMethod(target, "open", Open)这样的调用,"open"这些名字和js端的调用相同,所以Environment::SetMethod就是c++提供函数给js端调用的地方。接下来又google了一把,找到一篇介绍c++,js交互的好文章,不仅有js调c++的还有c++调java的

http://icyblazek.github.io/blog/2015/02/08/v8-ji-chu-ru-men/

精简摘录一下

(1)设置全局变量给js使用
v8::Handle global = v8::ObjectTemplate::New(globalIsolate);

global->SetAccessor(v8::String::NewFromUtf8(globalIsolate, "globalValue"), (AccessorGetterCallback)globalGetter, (AccessorSetterCallback)globalSetter);

Local source = String::NewFromUtf8(globalIsolate, "var tmpValue = globalValue; globalValue = 21; ");

(2)设置全局函数给js使用
Local globalFunTemplate = v8::FunctionTemplate::New(globalIsolate, (FunctionCallback)globalFun);

global->Set(v8::String::NewFromUtf8(globalIsolate, "globalFun"), globalFunTemplate);

(3)设置一个类给js使用
v8::Local personClass = v8::FunctionTemplate::New(globalIsolate, (FunctionCallback)createPerson);

personClass->SetClassName(v8::String::NewFromUtf8(globalIsolate, "Person"));
v8::Handlep_Prototype = personClass->PrototypeTemplate();

p_Prototype->Set(String::NewFromUtf8(globalIsolate, "sayHello"), FunctionTemplate::New(globalIsolate, Person_SayHello));

v8::Handle personInst = personClass->InstanceTemplate();
personInst->SetInternalFieldCount(1);
global->Set(v8::String::NewFromUtf8(globalIsolate, "Person"), personClass);

Local source = String::NewFromUtf8(globalIsolate, "var p = new Person('Kevin', 'Lu'); p.sayHello();");

(4)c++访问js变量和方法
v8::Local source = String::NewFromUtf8(callJSISolate, "function Person() { this.name = 'Kevin'; } Person.prototype.getName = function () { return this.name; }; var p = new Person();");

v8::Local<Script> compiled_script = v8::Script::Compile(source);
compiled_script->Run();

v8::Handle data_p = context->Global()->Get(String::NewFromUtf8(callJSISolate, "p"));

v8::Handle<Object> object_p = Handle<Object>::Cast(data_p);

v8::Handle getName = Handle::Cast(object_p->Get(String::NewFromUtf8(callJSISolate, "getName")));

Handle value = getName->Call(object_p, 0, NULL);

String::Utf8Value utf8(value);
printf("call js function result: %s\n", *utf8);

到这里就基本搞清楚nodejs c++和js是怎么交互的了,再回去看才c++端启动的时候SetupProcessObject的逻辑就比较容易了

4.process.binding方法在js端经常看到,感觉这是一个很好的debug的点,通过这个点可以比较清晰的了解c++和js两端的交互,所以要重点看一下。

Environment::GetCurrent(args)->static_cast<Environment*>(info.Data().As<v8::External>()->Value()) 这一句上来我就醉了这个info是v8::FunctionCallbackInfo<v8::Value>,这个Data()取的是什么值?我google了半天文档也没找到这个是干啥的,绕了好久才找到关联,在v8-inl.h里面v8::FunctionTemplate::New(isolate(), callback, external, signature) data是第三个参数,而这个external是Environment,所以感觉data是v8给function绑定外部变量的一种方式

之前node启动的时候看不懂的set_as_external(v8::External::New(isolate(), this)) 现在也能理解了,实际上就是把Environment给设置城external变量了。到这里真的感觉看源代码就是在玩一个解谜游戏,不断的找各种线索,建立关联,开启新地图。。。

Binding后面的逻辑还是比较简单的,如果env的binding_cahce里面有moudle就直接返回,否则生成一个新moudle并返回,如果是build的module比如util,会调用到node::util::Initialize 如果是javascript module就直接给module设置name->js source

这里的另外一个收获是,Binding调用栈的前一帧FunctionCallbackArguments::Call(FunctionCallback f),只要是js调c++都会走到这里来,所以这是比Binding更好的一个debug点

5.继续回到之前的那条线看看node怎么操作文件的,先看看同步类型的操作writeFileSync
其实是有三个操作组成的,openSync,writeSync,closeSync
openSync->Open(node_file.cc)->uv_fs_open(fs.c)->uv__fs_work->uv__fs_open->open(系统调用)
writeSync->WriteBuffer(node_file.cc)->uv_fs_write(fs.c)->uv__fs_work->uv__fs_write->pwrite
顺便搜了一把pwrite,这个调用原子操作完成seek和write,多线程操作的时候不用加锁

6.open和openSync的差别从uv_fs_open的POST宏开始,异步流程走到了uv__work_submit

(1)uv_once 调用pthread_once做初始化,threadpool.c->init_once,首先如果配置了线程池大小而且比默认的大就初始化一下线程对应的内存,然后初始化mutex和condition这个和java里面的类似但麻烦不少,然后启动线程池里面的线程。这里注意debug里面显示的线程,在init_once之前有5个线程,1个node主线程,4个v8工作线程,init_once之后又变出了4个node线程

(2)pthread_mutex_lock利用互斥锁加锁,然后把任务放进全局任务队列,调pthread_cond_signal告诉某个线程去读取任务执行任务,这里和java也很类似

(3)这里看到了QUEUE* q参数,从名字上看应该是个队列,应该很简单,但还是花了不少时间才理解。typedef void *QUEUE[2] 定义了void*的数组,而这是在描述一个双向链表结构,QUEUE* q是指向QUEUE的指针,(*(q))[0]是上一个节点,&((*(q))[0])的类型是void**,
(QUEUE **) &((*(q))[0])的意思是把void**转成了QUEUE **,左边再加上一颗*就又把类型转回了QUEUE *所以(*(QUEUE **) &((*(q))[0]))最终返回了QUEUE *就是当前节点的上一个节点,这么曲折是因为要给void*做类型转换。

#define QUEUE_DATA(ptr, type, field)                                          
((type *) ((char *) (ptr) - offsetof(type, field)))

uv_handle_t* handle = QUEUE_DATA(q, uv_handle_t, handle_queue);

#define UV_HANDLE_FIELDS                                                      \
  /* public */                                                                \
  void* data;                                                                 \
  /* read-only */                                                             \
  uv_loop_t* loop;                                                            \
  uv_handle_type type;                                                        \
  /* private */                                                               \
  uv_close_cb close_cb;                                                       \
  void* handle_queue[2];                                                      \
  union {                                                                     \
    int fd;                                                                   \
    void* reserved[4];                                                        \
  } u;                                                                        \
  UV_HANDLE_PRIVATE_FIELDS                                                    \

/* The abstract base class of all handles. */
struct uv_handle_s {
  UV_HANDLE_FIELDS
};

上面这段是要取队列节点上的数据,这里和java双链表最大的差别是java一般是节点包含数据,而这里是双链信息包含在数据里面。QUEUE_DATA的做法是拿ptr(指向void* handle_queue[2])减去handle_queue在uv_handle_t里面的偏移量从而得到uv_handle_t的地址最后做个类型转换。理解了上面两个难点,基本就可以理解QUEUE了

(4)继续看异步工作的流程,把断点加在uv__fs_open(fs.c)上面,就能看到uv__work_submit提交的工作在新生成的线程6里面执行了,worker()->uv__fs_work()->uv__fs_open()

(5)异步工作做完以后会通知主线程,uv__async_send(async.c),这里面实际通知的逻辑是
r = write(fd, buf, len) 居然写文件了,buf的内容就是个1,因为主线程在做io poll,所以马上捕捉到写事件,uv__io_poll(linux-core.c)->uv__async_io(async.c)->uv__async_event->uv__work_done->uv__fs_done(fs.c)  uv__async_io异步回调什么时候被注册的呢?grep了一把,uv__async_start->uv__io_init(&wa->io_watcher, uv__async_io, pipefd[0]),这个模式就很清晰了

这里异步通知的工作方式和java的玩法完全不同,第一感觉是会不会有性能问题?所以继续深入了解了一下,http://www.codexiu.cn/linux/blog/12066/
这里write的fd不是普通的文件,是通过系统调用__NR_eventfd2创建的,是linux提供的一种内建的异步支持实际上是内存中的一个64位无符号型整数,代码读到这个地方隐隐感觉多路复用epoll是个应用很广泛的模式,不仅限于io

(6)再看看open和write操作是怎么连接起来的,
uv__io_poll(linux-core.c)->uv__async_io(async.c)->uv__async_event->uv__work_done->uv__fs_done(fs.c)->After(node_file)->MakeCallback(async-wrap.cc)->node::WriteBuffer->uv_fs_write(fs.c)->uv__work_submit(threadpool.c)->uv__io_poll ...

(7)在AsyncWrap::MakeCallback函数的尾部有这么一段,
  if (tick_info->length() == 0) {
    env()->isolate()->RunMicrotasks();
  }

  if (tick_info->length() == 0) {
    tick_info->set_index(0);
    return ret;
  }

  tick_info->set_in_tick(true);

  env()->tick_callback_function()->Call(process, 0, nullptr);

立马觉得这个和之前关注的技术点tick的实现方式有关联,另外RunMicrotasks这个名字也看起来是一个有故事的函数,所以也了解了一下
http://stackoverflow.com/questions/25915634/difference-between-microtask-and-macrotask-within-an-event-loop-context,不过这里暂时不展开,后面再专注的了解

(8)在我的basicFile.js里面写文件完成以后有一句console.log('write finished...'),console.log经常被用到所以也想顺便看看实现,因为之前找到了很好的js调c++的debug点FunctionCallbackArguments::Call,所以很轻松就找对了地方
FunctionCallbackArguments::Call ->StreamBase::JSMethod -> StreamBase::WriteString


跟踪一个HTTP请求

跟踪http请求是debug网络框架的标准手段,下面是debug用的js代码
 
var http = require('http');

http.createServer(function (req, res) {
    console.log('http function invoked');
    res.writeHead(200, {'Content-Type': 'text/plain'});
    res.end('Hello World\n');
}).listen(3000);
console.log('Server running at http://localhost:3000/');

1.还是要考虑第一个断点加在哪里好,因为前面已经见过了uv__io_poll循环epoll的流程,所以可以先把断点设在这里看看,启动node,然后在浏览器发个http请求进来,果然停在了断点w->cb(loop, w, pe->events)。一路跟踪下去可以得到这样的调用链,uv__io_poll->uv__server_io->TCPWrap::OnConnection,从名字看这是接收了连接。那哪里发起的监听呢?TCPWrap::Listen看起来比较像,加个断点debug一下,果然是的,所以整个流程开始的一段是
js端发起->TCPWrap::Listen->uv_listen->uv_tcp_listen->listen(系统调用)

2.listen调用后面有一句uv__io_start(tcp->loop, &tcp->io_watcher, UV__POLLIN)值得重视,这个函数把&tcp->io_watcher加到loop->watchers里面,loop是lib_uv全局核心数据结构,这个方法其实就是注册一个poll监听的请求,后面的while循环会从请求列表里面把请求一个一个拿出来

3.找到断点怎么加后面就比较简单了,下面是整个流程
TCPWrap::Listen->uv_listen->uv_tcp_listen->listen->uv__io_start()->uv__io_poll->uv__server_io->uv__accept4->TCPWrap::OnConnection->v8->TCPWrap::New->stream.c.uv_accept(初始化client_handle)->MakeCallback(这个应该就是去触发js callback)->v8->StreamBase::ReadStart
->uv_read_start(做stream read相关数据结构的初始化,调用uv__io_start注册read事件监听)->node::Parser::Init()->_http_server.js.connectionListener->node::Parser::Consume()->uv__server_io(next loop)->uv__stream_io->uv__read->read(系统调用)->node::StreamResource::OnRead->node::Parser::Execute->到达状态s_headers_almost_done(遇到\n)->node::Parser::on_headers_complete->node::StreamBase::WriteString->uv_write2->uv__write->write(系统调用)->node::StreamBase::Writev(写入http返回码200)->node::Parser::on_message_complete(into js,其它的on也是在parse过程中调的)->后面就算结束了
这里让我比较纠结的是http parse的过程是在主线程里做的,如果把比较耗cpu的parse放到线程池里面做然后异步通知主线程会不会对cpu使用得更充分一些呢?


了解重点技术点

这个时候对node的代码已经有熟悉感了,所以下一步是集中了解一下比较重要的技术点了。之前提了7个技术点,还有3.nextTick和setTimeout这些是怎么实现的 5.nodejs里面的domain能捕获相关的异常看着很酷,怎么实现的(这个点还是读源代码过程中发现的) 6.epoll为啥那么牛逼 7.了解c/c++的编程风格 这几个问题没有搞定

nextTick相关实现

1.首先可以看看这两篇文章
http://stackoverflow.com/questions/25915634/difference-between-microtask-and-macrotask-within-an-event-loop-context
https://simeneer.blogspot.jp/2016/09/nodejs-eventemitter.html

提交式的异步任务方式分task和microtask,macrotasks有setTimeout, setInterval, setImmediate  microtasks有process.nextTick, Promises

个人理解microtask比较轻,放到当前同步块之后立即执行,堆积起来一起执行,task比较重一般是一个event loop执行一次,两个task之间可以能io,浏览器渲染这些动作,也是把queue执行完,区别是nextTick如果递归会卡住一直执行下去,setTimeout这种如果递归是放到下一个event loop里面跑,所以不会卡死

2.使用https://simeneer.blogspot.jp/2016/09/nodejs-eventemitter.html里面给的代码来debug
 
console.log('<0> schedule with setTimeout in 1-sec');
setTimeout(function () {
    console.log('[0] setTimeout in 1-sec boom!');
}, 1000);

console.log('<1> schedule with setTimeout in 0-sec');
setTimeout(function () {
    console.log('[1] setTimeout in 0-sec boom!');
}, 0);

console.log('<2> schedule with setImmediate');
setImmediate(function () {
    console.log('[2] setImmediate boom!');
});

console.log('<3> A immediately resolved promise');
aPromiseCall().then(function () {
    console.log('[3] promise resolve boom!');
});

console.log('<4> schedule with process.nextTick');
process.nextTick(function () {
    console.log('[4] process.nextTick boom!');
});

function aPromiseCall () {
    return new Promise(function(resolve, reject) {
        return resolve();
    });
}


3.在uv_run while大循环里面执行的
uv_run->uv__run_timers (setTimeout)
uv_run->uv__run_check(setImmediate)

process.nextTick和Promise call(对应Isolate::RunMicrotasks)基本是一起出现,调用的点比较多
比如AsyncWrap::MakeCallback,Environment::KickNextTick等等,基本是处理完一次io就会调用一次

domain的实现

1.因为nodejs有不少当前调用链之外的调用,这些调用的异常无法被当前调用链的try catch捕获,所以用domain统一处理异常,nextTick,timer,event这些是比较典型的使用场景,首先弄一端代码来debug

 
var domain = require("domain");
var d = domain.create();
d.on('error', function(err) {
    console.error('Error caught by domain:', err);
});

d.run(function() {
    process.nextTick(function() {
        fs.readFile('non_existent.js', function(err, str) {
            if(err) throw err;
            else console.log(str);
        });
    });
});


2.这个魔法到底是哪里发生的呢?有两种可能,d.run里面做了某种hack,拦截了异常调用链,或者异常抛到了外面被v8引擎拦截然后用某种事件机制通知回调到domain.run

 
domain2.on('error', function(err){
    console.log("domain2 处理这个错误 ("+err.message+")");
});

try{
    domain2.run(function(){
        console.log('good');
        throw "pig";
    });
    
}catch(err){
    console.log('outter');
}

下面这段代码执行的结果是outter,所以应该是全局捕获了

3.由于domain这个单词有一定特殊性,所以全局grep一下就能找到线索,相关调用链
LoadEnvironment(node.cc)->AddMessageListener(OnMessage)->node::OnMessage->FatalException(node.cc)->process._fatalException(node.js)->domain._errorHandler(domain.js)

process._setupDomainUse(domain.js)->SetupDomainUse(node.cc)->_tickDomainCallback(node.js)


epoll相关

由于能接触到epoll的系统调用了有了一些感性认识,所以想深入的了解一下。但是这里我并不准备debug内核,因为相关的知识和准备还不具备,做这件事情可能相当费时间,所以先在网上找我能理解的文章去建立一些基本认识,等到以后有机会debug内核的时候再去验证这些认识

 1.比较是了解一项技术常用的手段,epoll之前有poll,select,epoll比这两牛在什么地方呢?就从这个角度去搜索,找到了一份不错的文章
http://blog.csdn.net/xiajun07061225/article/details/9250579

2.epoll有三个函数,epoll_create,epoll_ctl,epoll_wait三个函数,epoll_create会返回一个fd
epoll_ctl可以用来增加新的fd到监听fd列表里面
select的api是这样的select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout)
epoll_wait是这样的epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

所以第一个差别是api导致的,从上面的调用方式就可以看到epoll比select/poll的优越之处:因为后者每次调用时都要传递你所要监控的所有socket给select/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。而我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表

3.epoll是事件就绪通知而不是像select那样主动扫描所有监听的fd的集合,所以性能不会随fd增加而下降,这里的核心是epoll维护一个数据结构,其中有一个准备就绪链表,当数据可读时间发生的时候就会把相应的fd放到这个链表里面,epoll只关心缓冲区非满和缓冲区非空事件

4.epoll提供的就绪fd数组使用mmap节省复制开销

5.我猜想的流程是,网卡有数据->内核把数据复制到读缓冲区->触发中断回调程序把socket复制到准备就绪fd列表->epoll_wait返回

6.这里我对读写什么时候会阻塞有一些疑惑,所以顺便找到了一篇不错的文章http://www.cnblogs.com/promise6522/archive/2012/03/03/2377935.html
write成功返回,只是buf中的数据被复制到了kernel中的TCP发送缓冲区。至于数据什么时候被发往网络,什么时候被对方主机接收,什么时候被对方进程读取,系统调用层面不会给予任何保证和通知

编程风格和代码结构

1.基于宏的仿继承,UV_HANDLE_FIELDS<-UV_STREAM_FIELDS<-UV_PIPE_PRIVATE_FIELDS ...

2.基于宏的代码生成 
    set_as_external
    ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES
    先定义V,然后在另一个宏里面去使用V

3.src/*.cc,这里是真正node的c++代码,node.cc是入口和核心启动和装配Process对象以及一些核心的回调方法(给js端用的)。env.cc 类似于一个全局的context用于传输全局参数,比如module_load_list_array=process.moduleLoadList 传送于c++和js之间。xxx_wrap.cc node和lib_uv的桥梁,接js的回调转发到lib_uv的库

4.libuv,uv.h定义uv提供的函数比如uv_run,uv_loop_init等等。win/unix 提供平台相关实现,unix下面有aix.c,kqueue.c,linux-core.c,sunos.c等等

重温启动流程

第一遍看启动流程的时候还有一些地方没太看懂,现在重新复习一下同时也可以把所了解的知识串起来

1.uv__signal_global_once_init 调用uv__make_pipe创建一个管道fd[3,4] (0,1,2是标准错误标准输出这些)属性是uv__signal_lock_pipefd,这对pipeline是用来实现加锁信号量的读pipeline的一端被锁住直到读出合法的值,写的那一端写入*解锁

2.uv_loop_t的属性初始化,loop->timer_heap(timer堆) loop->wq(工作队列) loop->active_reqs(活跃请求队列,active_reqs是个数组联想一下之前看过的QUEUE的定义)  loop->async_handles(异步handle列表,handle类似于http session这种概念,相当于开了一个异步通道,然后这个异步通道上面可以持续有异步请求发生,而request是短暂型对象通常对应handle上的一个io操作,request会用data属性传值,但是多个async handle可以共用一个async fd) loop->nfds(watch的fd的数量) loop->watchers(存放uv__io_t和对应的fd,这个非常非常核心,就是epoll监听的文件描述符列表) loop->pending_queue(发生连接错误或者写结束之类的会把请求扔到这个队列) loop->watcher_queue(待注册epoll的事件会放到这个队列里面) loop->closing_handles(关闭事件的handle列表) loop->signal_pipefd(信号处理的pipeline的一对fd) loop->backend_fd(epoll create出来的fd,后面的epoll调用都依赖这个fd) loop->emfile_fd(EMFILE进程fd用尽,uv__emfile_trick备用一个fd,如果fd用尽就关闭这个备用的fd,然后就可以accept新的连接,accept的同时关闭并告诉客户端fd用尽的状态准备fd的方法是只读的方式打开/根目录)

3.uv__platform_loop_init 调用epoll create初始化fd=5

4.uv_signal_init 创建管道loop->signal_pipefd[6,7] 初始化loop->signal_io_watcher结构,调用uv__io_start把signal_io_watcher加到loop->watchers里面去后面注册epoll就会注册上去。 loop->child_watcher是一个uv_handle_t也会在这里初始化

uv_signal_init 类似这种初始化方式 处理的是linux信号量这种,uv_signal_start 真正启动信号处理,uv__signal_register_handler和uv__signal_handler把流程打通,调用流程是 信号量产生->信号量callback->write pipeline->select on pipeline另一端->调用目标函数

5.初始化读写锁cloexec_lock,打开文件的时候会读这个锁,fork进程的时候会写这个锁。初始化wq_mutex,这个主要事线程池用

6.初始化wq_async,这个也主要是threadpool.c在用,异步io的逻辑主要从这里走。首先通过系统调用__NR_eventfd2创建一个fd,http://www.codexiu.cn/linux/blog/12066/这不是普通的fd,是linux提供的一种内建的异步支持实际上是内存中的一个64位无符号型整数,这里返回的fd=8,调用uv__io_start加到loop里面。后面有一大堆的goto来处理初始化失败去关闭资源,没有exception有点杯具啊


回味一下node的编程模式

























引用一下这张比较经典的event loop的图片,node其实就是在一个主循环里面把大多数逻辑全部做掉。回想之前的http的例子,启动的时候其实就是注册了一大堆listener,然后在poll里面触发事件,由事件触发一长串调用直到写response成功以后返回,然后主线程才能进行下一次poll,而各种nextTick,setTimeout其实就是把任务放到queue里面然后找各个事件函数运行的间歇期去运行。再回想之前写文件的例子,这个例子是真异步,第一次操作在uv__work_submit之后就返回给主线程,主线程这时就继续跑poll,然后异步线程通过uv_async_send去通知poll然后执行之前的异步回调。所以感觉异步就是把程序切成一帧一帧的,帧和帧之间的关联记在某个数据结构里面,然后主线程就是一帧一帧的去跑这些程序帧

node还有一个感觉比较清爽的是把io,async,signal等等用统一的poll循环的方式给表达出来了,编程方式很统一,有点优雅的感觉。

学到了什么?

终于要结束这一段艰难的冒险了,应该还有一些精彩的地方没有浏览到,但那些地方没有和我现有的知识建立关联,而穷举法是效率比较低的方式,所以暂时就到此为止了。总结一下这段冒险自己的收获吧。

1.对node有了一个比较深刻的理解
2.node require元编程的trick令我印象深刻
3.node domain对异步异常的处理方式也很值得学习
4.大致了解了v8引擎的用法
5.对libuv有了一个比较深刻的认识
6.学习了c里面一个比较精致的双向链表实现,同时学会了看复杂的多重指针
7.对信号量,管道,异步这些的理解深了一层,同时学到了一种比较优雅的统一处理这些事件的方式
8.对异步编程的本质理解更加深刻了
9.重温了c和c++的一些语法,对c/c++大型程序如何组织也有了一点认识
10.对epoll的理解深刻了一些
11.学习了一些基本的linux系统调用
12.了解了pthread的基本用法
13.对nodejs积木式创新的模式印象很深刻,同时联想到了weex

这样算下来收货还是挺丰富的,没有白投入这么多的时间