先说说问题2,一般的解决方案是让下游做幂等或者尽量每消费一条消息都记位点,对于少数严格的场景可能需要把位点和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费位点,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新。
问题1的解决方案也就是kafka实现的方案是每个producer有一个producer id,服务端会通过这个id关联记录每个producer的状态,每个producer的每条消息会带上一个递增的sequence,服务端会记录每个producer对应的当前最大sequence,如果新的消息带上的sequence不大于当前的最大sequence就拒绝这条消息,问题1的场景如果消息落盘会同时更新最大sequence,这个时候重发的消息会被服务端拒掉从而避免消息重复。后面展开详细说一下这个解决方案。
1.最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见
2.producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务
3.kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交
4.producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务
5.流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别
一个比较典型的consume-transform-produce的场景像下面这样
public class KafkaTransactionsExample {
public static void main(String args[]) {
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
KafkaProducer producer = new KafkaProducer<>(producerConfig);
producer.initTransactions();
while(true) {
ConsumerRecords records = consumer.poll(CONSUMER_POLL_TIMEOUT);
if (!records.isEmpty()) {
producer.beginTransaction();
List> outputRecords = processRecords(records);
for (ProducerRecord outputRecord : outputRecords) {
producer.send(outputRecord);
}
sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
producer.endTransaction();
}
}
}
}
几个关键概念和推导
1.因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。
2.事务管理中事务日志是必不可少的,kafka使用一个内部topic来保存事务日志,这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。
3.因为事务存在commit和abort两种操作,而客户端又有read committed和read uncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。
4.producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联,这个就是TransactionalId,一个producer挂了,另一个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。注意不要把TransactionalId和数据库事务中常见的transaction id搞混了,kafka目前没有引入全局序,所以也没有transaction id,这个TransactionalId是用户提前配置的。
5. TransactionalId能关联producer,也需要避免两个使用相同TransactionalId的producer同时存在,所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer epoch
重要的类图
官方文档的数据流组件图
上图中每个方框代表一台独立的机器,图下方比较长的圆角矩形代表kafka topic,图中间的两个角是圆的的方框代表broker里面的逻辑组件,箭头代表rpc调用。
接下来说一下事务的数据流,这里基本按照官方文档的结构加上我自己看代码的一点补充
1.首先producer需要找到transaction coordinator。TransactionManager.lookupCoordinator
private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
switch (type) {
case GROUP:
consumerGroupCoordinator = null;
break;
case TRANSACTION:
transactionCoordinator = null;
break;
default:
throw new IllegalStateException("Invalid coordinator type: " + type);
}
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
enqueueRequest(new FindCoordinatorHandler(builder));
}
2.获取producer id,producer id是比较重要的概念,精确一次投递需要producer id+sequence防止重复投递,事务消息也需要保存transactional id和producer id的对应关系。客户端调用KafkaProducer.initTransactions的时候会向coordinator请求producer id,TransactionManager.initializeTransactions
public synchronized TransactionalRequestResult initializeTransactions() {
ensureTransactional();
transitionTo(State.INITIALIZING);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.sequenceNumbers.clear();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(builder);
enqueueRequest(handler);
return handler.result;
}
coordinator端处理逻辑在TransactionCoordinator.handleInitProducerId流程比较复杂,首先如果对应的transactional id没有产生过producer id会找producerIdManager生成一个
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).right.flatMap {
case None =>
val producerId = producerIdManager.generateProducerId()
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = time.milliseconds())
txnManager.putTransactionStateIfNotExists(transactionalId, createdMetadata)
case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
}
producer id需要全局唯一,有点类似于tddl sequence的生成逻辑,ProducerIdManager.generateProducerId会一次申请一批id然后在zk上面保存状态,本地每次生成+1,如果超出了当前批次的范围就去找zk重新申请
拿到了producer id接下来处理事务状态,保证之前的事务状态能够处理完毕,该提交的提交,该回滚的回滚。
TransactionCoordinator.prepareInitProduceIdTransit处理producer id的变化比如开始一个新的事务可能会增加producer epoch,也可能生成新的producer id
case PrepareAbort | PrepareCommit =>
// reply to client and let it backoff and retry
Left(Errors.CONCURRENT_TRANSACTIONS)
case CompleteAbort | CompleteCommit | Empty =>
val transitMetadata = if (txnMetadata.isProducerEpochExhausted) {
val newProducerId = producerIdManager.generateProducerId()
txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds())
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds())
}
Right(coordinatorEpoch, transitMetadata)
case Ongoing =>
// indicate to abort the current ongoing txn first. Note that this epoch is never returned to the
// user. We will abort the ongoing transaction and return CONCURRENT_TRANSACTIONS to the client.
// This forces the client to retry, which will ensure that the epoch is bumped a second time. In
// particular, if fencing the current producer exhausts the available epochs for the current producerId,
// then when the client retries, we will generate a new producerId.
Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
最后如果之前的事务处于进行中的状态会回滚事务
handleEndTransaction(transactionalId,
newMetadata.producerId,
newMetadata.producerEpoch,
TransactionResult.ABORT,
sendRetriableErrorCallback)
或者就是新事务,往事务日志里面插一条日志(对应数据流图中的2a)
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback)
3.客户端调用KafkaProducer.beginTransaction开始新事务。这一步相对简单,就是客户端设置状态成State.IN_TRANSACTION
4.consume-transform-produce过程,这一步是实际消费消息和生产消息的过程。
(1)客户端发送消息时(KafkaProducer.send),对于新碰到的TopicPartition会触发AddPartitionsToTxnRequest。服务端对应的处理在TransactionCoordinator.handleAddPartitionsToTransaction,主要做的事情是更新事务元数据和记录事务日志(对应数据流图中的4.1a)。在事务中记录partition的作用是后面给事务每个partition发送提交或者回滚标记时需要事务所有的partition。
(2)客户端通过KafkaProducer.send发送消息(ProduceRequest),比较早的kafka版本增加了PID,epoch,sequence number等几个字段,对应数据流图中的4.2a
(3)客户端调用KafkaProducer.sendOffsetsToTransaction保存事务消费位点。服务端的处理逻辑在TransactionCoordinator.handleAddPartitionsToTransaction,和4.1基本是一样的,不同的是4.3记录的是记录消费位点的topic(GROUP_METADATA_TOPIC_NAME)。
(4)4.3调用的后半部分会触发TxnOffsetCommitRequest,通过数据消息的方式把消费位点持久化到GROUP_METADATA_TOPIC_NAME(__consumer-offsets)这个topic里面去,对应数据流图中的4.4a。
客户端发起逻辑在AddOffsetsToTxnHandler.handleResponse
if (error == Errors.NONE) {
log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
builder.consumerGroupId());
// note the result is not completed until the TxnOffsetCommit returns
pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
transactionStarted = true;
}
因为需要处理可见性相关的逻辑,服务端事务消费位点和普通消费位点提交的处理逻辑稍有不同,调用GroupCoordinator.handleTxnCommitOffsets而不是handleCommitOffsets。
5.结束事务需要调用KafkaProducer.commitTransaction或者KafkaProducer.abortTransaction
(1)首先客户端会发送一个EndTxnRequest,而服务端由TransactionCoordinator.handleEndTransaction处理。
handleEndTransaction首先会做一个可能的状态转换让事务进入预提交或者预放弃阶段
else txnMetadata.state match {
case Ongoing =>
val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
PrepareCommit
else
PrepareAbort
Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
接下来会在事务日志里面记录PREPARE_COMMIT或者PREPARE_ABORT日志,对应数据流图中的5.1a
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)
再接下来会往用户数据日志里面发送COMMIT或者ABORT的Control Message,最后往事务日志里面写入COMMIT或者ABORT,才算完成了事务的提交过程。这个过程是用回调的方式组织起来的,代码的流程是TransactionStateManager.appendTransactionToLog->TransactionMarkerChannelManager.addTxnMarkersToSend->TransactionStateManager.appendTransactionToLog
(2)往用户数据日志里面发送COMMIT或者ABORT的Control Message的过程,对应数据流图中的5.2a
发起点在回调方法sendTxnMarkersCallback,这个方法首先会做转台转换让事务进入CompleteCommit或者CompleteAbort状态
case PrepareCommit =>
if (txnMarkerResult != TransactionResult.COMMIT)
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
case PrepareAbort =>
if (txnMarkerResult != TransactionResult.ABORT)
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
最后会往事务相关的每个broker发送WriteTxnMarkersRequest,如果事务包含消费位点也会往__consumer-offsets所在的broker发请求。
broker端的处理在KafkaApis.handleWriteTxnMarkersRequest会把control message写入日志
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
requiredAcks = -1,
internalTopicsAllowed = true,
isFromClient = false,
entriesPerPartition = controlRecords,
responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
(3)事务日志写入最终的COMMIT或者ABORT日志,对应数据流图的5.3,这一步完成了一个事务就算彻底完成了。
发起点在回调方法appendToLogCallback
精确一次投递
大致列一下流程中的关键节点
1.在客户端每次发送消息之前会检查是否有producerId如果没有会找服务端去申请,Sender.run
if (transactionManager != null) {
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
}
1.sequenceNumber可以设计成每个producer唯一或者更细粒度的对每个topic-partition唯一,topic-partition唯一的好处是对于每个topic-partition sequenceNumber可以设计成连续的,这样broker端可以做更强的校验,比如检查丢消息,kafka使用的就是细粒度的方法,发现sequenceNumber不连续的时候会抛异常OutOfOrderSequenceException
2.发消息(KafkaProducer.doSend)是个异步的过程,但同时提供Future返回值使得在必要的时候可以把异步变成同步等待。kafka也实现了攒消息批量发送的能力(RecordAccumulator.append),攒消息的存放方式是一个大hash map,key是topic-partition,消息实际刷出和发送在一个单独的线程中执行,调用Sender.sendProducerData。被刷出的消息的判定在RecordAccumulator.ready,主要依据是消息集是否已满或者是否超时。
3.Consumer消费的时候怎样控制事务可见性呢?一个比较直观的方法就是先把事务消息buffer起来,然后遇到提交或者回滚标志的时候做相应的处理,kafka处理的更巧妙一些。首先是不能读到未提交的事务的控制,kafka引入了lastStableOffset这个概念,lastStableOffset是当前已经提交的事务的最大位点。在ReplicaManager.readFromLocalLog里面有控制,