
- RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息
使用背景可以参考阿里云的《事务消息》介绍
2.1 传统的大事务可以被拆分为小事务,通过RocketMQ版分布式事务消息保证数据的最终一致性
- 在github上拉取RocketMQ源码,具体源码搭建环境参考《搭建RocketMq源码调式环境》启动namesrv和broker启动example的事务生产者TransactionProducer
// 事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
// 启动生产者
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
int i = 0;
Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
producer.shutdown(); //断点在这,避免生产者被立马关闭
事务发送提交过程:
1.1 发送消息(half消息)
1.2 服务端响应消息写入结果
1.3 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
1.4 根据本地事务状态执行Commit或者Rollback(Commit *** 作生成消息索引,消息对消费者可见)
发送消息入口TransactionMQProducer#sendMessageInTransaction
2.1 debug DefaultMQProducerImpl#sendMessageInTransaction源码发现,其实事务消息主要就是多了TRAN_MSG=true的事务标识(关于服务端如何响应见下面分析3)
2.2 如果半消息发送成功,则触发事务监听器执行本地事务
2.3 根据本地事务状态,向broker发送commit或者rollback请求(详情见下方4)
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 事务监听器
TransactionListener transactionListener = getCheckListener();
// 添加TRAN_MSG=true事务消息标识,以及PGROUP=please_rename_unique_group_name
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 发送半消息
SendResult sendResult = this.send(msg);
// 消息发送成功处理
switch (sendResult.getSendStatus()) {
case SEND_OK: {
//消息的UNIQ_KEY属性作为事务ID
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (transactionListener != null) {
log.debug("Used new transaction API");
// 执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
}
}
// 根据本地事务状态,向broker发送commit或者rollback请求
this.endTransaction(msg, sendResult, localTransactionState, localException);
}
发送事务消息后,broker是怎样响应的呢?
3.1 首先看一下SendMessageProcessor#asyncSendMessage,发现是根据事务标识做分别处理
private CompletableFutureasyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); CompletableFuture putMessageResult = null; // 判断消息是否有事务标识,有则按照事务消息处理,否则按照普通消息处理 if (transFlag != null && Boolean.parseBoolean(transFlag)) { putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); }else { putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } }
3.2 处理事务消息TransactionalMessageBridge#asyncPutHalfMessage
(1)将消息转换为半消息parseHalfMessageInner,更换主题以及队列,防止被消费端消费
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 备份原消息的topic和queueId
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 设置消息的系统标识为TRANSACTION_NOT_TYPE
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 重新赋值消息的topic为RMQ_SYS_TRANS_HALF_TOPIC以及queueId为0
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
(2)DefaultMessageStore#asyncPutMessage将半消息写入commitLog
发送commit或rollback请求:DefaultMQProducerImpl#endTransaction
4.1 根据事务状态设置CommitOrRollback属性,向broker发送请求
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
// 找到半消息所在队列的broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 组装事务提交/回滚请求报文
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 根据本地事务状态,设置commitOrRollback属性进行提交或者回滚
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
// 以oneway的方式发送broker(单向:只发送请求不等待应答)
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
4.2 broker接收EndTransaction的入口在EndTransactionProcessor#processRequest
(1)根据请求的偏移量获取指定的half消息
(2)将half消息 转换为原始消息(topic、queueId更换)
(3)将消息发送到真正的Topic里,该消息可以开始下发给消费者
(4)如果落盘成功,则删除prepare消息,其实是将消息写入到Op Topic里
(5)如果是Rollback,则直接将消息转换为原消息,并写入到Op Topic里
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 获取半消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 将prepare消息转换为原消息,该消息的Topic就是真正消息的Topic
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
// 将消息发送到真正的Topic里,该消息可以开始下发给消费者
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//如果落盘成功,则删除prepare消息,其实是将消息写入到Op Topic里
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
}
}
}
- RocketMQ事务消息方案中引入Op消息的概念,用Op消息标识事务消息已经确定状态(commit或者Rollback)
1.1 如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)
1.2 实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的Op消息的存储和对应关系
如何处理二阶段失败的消息?
3.1 如果出现网络问题导致Commit失败,Broker端对未确定状态的消息发起回查
(1)将消息发送到对应的Producer端(同一个Group的Producer)
(2)Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback
3.2 Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint
3.3 默认回查15次,超过则默认回滚该消息
- 自己没有实战过RocketMQ事务消息,本篇文章相当于是技术积累。在以后实现中积累方案有说的不对的地方帮忙指正,实战的场景欢迎大家多多讨论
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)