
OpCode.createSession&zks.finishSessionInit(request.cnxn, true);FinalRequestProcessor
LOG.debug("Processing request:: {}", request);
// 日志记录
if (LOG.isTraceEnabled())
{
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping)
{
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
ProcessTxnResult rc = null;
// 请求未被过滤--对其处理
if (!request.isThrottled())
{
rc = applyRequest(request);
}
// 需要进行回复的请求对象会包含cnxn
if (request.cnxn == null)
{
return;
}
// 获取请求中的ServerCcxn以便进行回复
ServerCnxn cnxn = request.cnxn;
// 获得LastProcessedZxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
String lastOp = "NA";
// 更新zk服务对象处理中请求个数
zks.decInProcess();
// zk服务对象执行请求完成处理
zks.requestFinished(request);
// 大请求长度
int largeRequestLength = request.getLargeRequestSize();
if (largeRequestLength != -1)
{
// 更新currentLargeRequestBytes
currentLargeRequestBytes.addAndGet(-largeRequestLength);
}
Code err = Code.OK;
Record rsp = null;
String path = null;
int responseSize = 0;
try
{
// 如果请求的事务头不是null
// 且 请求的事务头的类型为错误
// 表示请求处理出错了
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error)
{
AuditHelper.addAuditLog(request, rc, true);
// 获取请求中的异常对象
if (request.getException() != null)
{
// 再次抛出
throw request.getException();
}
else
{
// 构造异常对象并抛出
throw KeeperException.
create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
}
// 获取请求中的异常对象
KeeperException ke = request.getException();
// 如果是会话移动异常
if (ke instanceof SessionMovedException)
{
// 再次抛出
throw ke;
}
// 如果请求存在异常对象,且不是复合请求
if (ke != null && request.type != OpCode.multi)
{
// 再次抛出
throw ke;
}
LOG.debug("{}", request);
// 如果请求已经过期,
if (request.isStale())
{
// 统计
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
}
// 如果请求被过滤
if (request.isThrottled())
{
// 抛出异常
throw KeeperException.create(Code.THROTTLEDOP);
}
AuditHelper.addAuditLog(request, rc);
// 请求类型
switch (request.type)
{
// 请求为ping
case OpCode.ping:
{
lastOp = "PING";
updateStats(request, lastOp, lastZxid);
// 发送回复
responseSize = cnxn.sendResponse(
new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
return;
}
// 请求为createSession
case OpCode.createSession:
{
lastOp = "SESS";
updateStats(request, lastOp, lastZxid);
zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi:
{
lastOp = "MULT";
rsp = new MultiResponse();
for (ProcessTxnResult subTxnResult : rc.multiResult)
{
OpResult subResult;
switch (subTxnResult.type)
{
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
break;
case OpCode.delete:
case OpCode.deleteContainer:
subResult = new DeleteResult();
break;
case OpCode.setdata:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err);
if (subTxnResult.err == Code.SESSIONMOVED.intValue())
{
throw new SessionMovedException();
}
break;
default:
throw new IOException("Invalid type of op");
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.multiRead:
{
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord)
{
try
{
Record rec;
switch (readOp.getType())
{
case OpCode.getChildren:
rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
break;
case OpCode.getdata:
rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
GetDataResponse gdr = (GetDataResponse) rec;
subResult = new GetDataResult(gdr.getData(), gdr.getStat());
break;
default:
throw new IOException("Invalid type of readOp");
}
}
catch (KeeperException e)
{
subResult = new ErrorResult(e.code().intValue());
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.create:
{
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
{
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.delete:
case OpCode.deleteContainer:
{
lastOp = "DELE";
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setdata:
{
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig:
{
lastOp = "RECO";
rsp = new GetDataResponse(((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8), rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL:
{
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession:
{
lastOp = "CLOS";
err = Code.get(rc.err);
break;
}
case OpCode.sync:
{
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check:
{
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists:
{
lastOp = "EXIS";
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
path = existsRequest.getPath();
if (path.indexOf('') != -1)
{
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getdata:
{
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches:
{
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), Collections.emptyList(), Collections.emptyList(), cnxn);
break;
}
case OpCode.setWatches2:
{
lastOp = "STW2";
SetWatches2 setWatches = new SetWatches2();
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), setWatches.getPersistentWatches(), setWatches.getPersistentRecursiveWatches(), cnxn);
break;
}
case OpCode.addWatch:
{
lastOp = "ADDW";
AddWatchRequest addWatcherRequest = new AddWatchRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, addWatcherRequest);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL:
{
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null)
{
throw new KeeperException.NonodeException();
}
zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path, null);
Stat stat = new Stat();
List acl = zks.getZKDatabase().getACL(path, stat);
requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());
try
{
zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.ADMIN, request.authInfo, path, null);
rsp = new GetACLResponse(acl, stat);
}
catch (KeeperException.NoAuthException e)
{
List acl1 = new ArrayList(acl.size());
for (ACL a : acl)
{
if ("digest".equals(a.getId().getScheme()))
{
Id id = a.getId();
Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
acl1.add(new ACL(a.getPerms(), id1));
}
else
{
acl1.add(a);
}
}
rsp = new GetACLResponse(acl1, stat);
}
break;
}
case OpCode.getChildren:
{
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getAllChildrenNumber:
{
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null)
{
throw new KeeperException.NonodeException();
}
zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null);
int number = zks.getZKDatabase().getAllChildrenNumber(path);
rsp = new GetAllChildrenNumberResponse(number);
break;
}
case OpCode.getChildren2:
{
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null)
{
throw new KeeperException.NonodeException();
}
zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null);
List children = zks.getZKDatabase().getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.checkWatches:
{
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
if (!containsWatcher)
{
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches:
{
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
if (!removed)
{
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.whoAmI:
{
lastOp = "HOMI";
rsp = new WhoAmIResponse(AuthUtil.getClientInfos(request.authInfo));
break;
}
case OpCode.getEphemerals:
{
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List ephemerals = new ArrayList<>();
if (prefixPath == null || prefixPath.trim().isEmpty() || "/".equals(prefixPath.trim()))
{
ephemerals.addAll(allEphems);
}
else
{
for (String p : allEphems)
{
if (p.startsWith(prefixPath))
{
ephemerals.add(p);
}
}
}
rsp = new GetEphemeralsResponse(ephemerals);
break;
}
}
}
catch (SessionMovedException e)
{
cnxn.sendCloseSession();
return;
}
catch (KeeperException e)
{
err = e.code();
}
catch (Exception e)
{
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining())
{
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
err = Code.MARSHALLINGERROR;
}
ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
updateStats(request, lastOp, lastZxid);
try
{
if (path == null || rsp == null)
{
responseSize = cnxn.sendResponse(hdr, rsp, "response");
}
else
{
int opCode = request.type;
Stat stat = null;
switch (opCode)
{
case OpCode.getdata:
{
GetDataResponse getDataResponse = (GetDataResponse) rsp;
stat = getDataResponse.getStat();
responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
case OpCode.getChildren2 :
{
GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
stat = getChildren2Response.getStat();
responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
default:
responseSize = cnxn.sendResponse(hdr, rsp, "response");
}
}
if (request.type == OpCode.closeSession)
{
cnxn.sendCloseSession();
}
}
catch (IOException e)
{
LOG.error("FIXMSG", e);
}
finally
{
ServerMetrics.getMetrics().RESPONSE_BYTES.add(responseSize);
}
applyRequest
// 对请求进行处理
ProcessTxnResult rc = zks.processTxn(request);
// 关闭会话
if (request.type == OpCode.closeSession && connClosedByClient(request))
{
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId))
{
return rc;
}
}
// 请求是写请求
if (request.getHdr() != null)
{
// 统计
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0)
{
ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
}
}
return rc;
finishSessionInit
try
{
if (valid)
{
if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn))
{
serverCnxnFactory.registerConnection(cnxn);
}
else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn))
{
secureServerCnxnFactory.registerConnection(cnxn);
}
}
}
catch (Exception e)
{
LOG.warn("Failed to register with JMX", e);
}
try
{
ConnectResponse rsp = new ConnectResponse(
0,
valid ? cnxn.getSessionTimeout() : 0,
valid ? cnxn.getSessionId() : 0,
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient)
{
bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
cnxn.sendBuffer(bb);
if (valid)
{
LOG.debug("Established session 0x{} with negotiated timeout {} for client {}",
Long.toHexString(cnxn.getSessionId()), cnxn.getSessionTimeout(),
cnxn.getRemoteSocketAddress());
// 开启连接上数据接收
cnxn.enableRecv();
}
else
{
LOG.info("Invalid session 0x{} for client {}, probably expired",
Long.toHexString(cnxn.getSessionId()), cnxn.getRemoteSocketAddress());
// 发送会话终止标记
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
}
catch (Exception e)
{
LOG.warn("Exception while establishing session, closing", e);
cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
}
processTxn
// 获取请求的事务头
TxnHeader hdr = request.getHdr();
// 针对创建会话&关闭会话进行处理
processTxnForSessionEvents(request, hdr, request.getTxn());
// 写请求
final boolean writeRequest = (hdr != null);
// 集群请求
final boolean quorumRequest = request.isQuorum();
// 不是写请求
// 且不是集群请求
if (!writeRequest && !quorumRequest)
{
// 直接返回处理结果
return new ProcessTxnResult();
}
synchronized (outstandingChanges)
{
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
if (hdr == null)
{
return new ProcessTxnResult();
}
else
{
return getZKDatabase().processTxn(hdr, txn, digest);
dataTree.processTxn(hdr, txn, digest);
ProcessTxnResult result = processTxn(header, txn);
this.processTxn(header, txn, false);
compareDigest(header, txn, digest);
return result;
}
// 如果是写请求
if (writeRequest)
{
// 获取zxid
long zxid = hdr.getZxid();
// 如果 存在处理中ChangeRecord
// 且 首个ChangeRecord的zxid小于这里请求的zxid
while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid)
{
// 移除
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid)
{
LOG.warn("Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid), Long.toHexString(zxid));
}
// 处理路径的最新状态信息
if (outstandingChangesForPath.get(cr.path) == cr)
{
// path的关联项从outstandingChangesForPath移除
// 因为path的关联项已经持久化到数据实体对象中了
outstandingChangesForPath.remove(cr.path);
}
}
}
// 集群请求
if (quorumRequest)
{
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
addCommittedProposal
对最近一定数量的已经完成最后处理的请求对象,
构造关联提议对象,
将其存储起来,以备需要时使用
// 写锁
WriteLock wl = logLock.writeLock();
try
{
// 锁定
wl.lock();
// 容器中提议对象数量超过commitLogCount
if (committedLog.size() > commitLogCount)
{
// 从容器移除一个元素
committedLog.remove();
// 获取首个提议对象对应集群包中的zxid---容器中所有提议中最小的zxid
minCommittedLog = committedLog.peek().packet.getZxid();
}
// committedLog变空了
if (committedLog.isEmpty())
{
// 容器中所有提议中最小的zxid,最大的zxid此时均为处理掉请求的zxid
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
// 请求对象序列化
byte[] data = SerializeUtils.serializeRequest(request);
// 构造集群包
// 包类型Leader.PROPOSAL
// 包zxid为处理掉对象的zxid
// 包数据为对象序列化所得
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
// 动态构造新的提议对象
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
// 将提议对象加入committedLog
committedLog.add(p);
// 更新committedLog中所有提议的最大的zxid
maxCommittedLog = p.packet.getZxid();
}
finally
{
wl.unlock();
}
processTxnForSessionEvents(request, hdr, request.getTxn());
// 请求类型
int opCode = (request == null) ? hdr.getType() : request.type;
// 客户端id
long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
// 创建会话
if (opCode == OpCode.createSession)
{
// 事务头非空
// 且 事务体为CreateSessionTxn
if (hdr != null && txn instanceof CreateSessionTxn)
{
CreateSessionTxn cst = (CreateSessionTxn) txn;
// 向会话追踪提交 会话id--超时设置
sessionTracker.commitSession(sessionId, cst.getTimeOut());
}
// 请求为空
// 且 请求不是本地会话
else if (request == null || !request.isLocalSession())
{
LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString());
}
}
// 关闭会话
else if (opCode == OpCode.closeSession)
{
// 向会话追踪移除会话
sessionTracker.removeSession(sessionId);
}
集群请求判断
public boolean isQuorum()
{
switch (this.type)
{
// 读取类
case OpCode.exists:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.getdata:
case OpCode.getEphemerals:
case OpCode.multiRead:
case OpCode.whoAmI:
return false;
// 写入类
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.error:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setACL:
case OpCode.setdata:
case OpCode.check:
case OpCode.multi:
case OpCode.reconfig:
return true;
// 会话创建,管理类在请求非本地会话时,属于集群
case OpCode.closeSession:
case OpCode.createSession:
return !this.isLocalSession;
default:
return false;
}
}
processTxn
利用事务头,事务体,实际处理写请求
// 动态构造ProcessTxnResult对象
ProcessTxnResult rc = new ProcessTxnResult();
try
{
// 事务头包含客户端id
rc.clientId = header.getClientId();
// 事务头包含cxid
rc.cxid = header.getCxid();
// 事务头包含zxid
rc.zxid = header.getZxid();
// 事务头包含类别
rc.type = header.getType();
// 事务头包含错误码
rc.err = 0;
// 事务头包含multiResult
rc.multiResult = null;
switch (header.getType())
{
// 创建节点
case OpCode.create:
// 事务体
CreateTxn createTxn = (CreateTxn) txn;
// 节点路径
rc.path = createTxn.getPath();
createNode(
// 路径
createTxn.getPath(),
// 数据
createTxn.getData(),
// 权限
createTxn.getAcl(),
// 临时节点包含所属客户端id
createTxn.getEphemeral() ? header.getClientId() : 0,
// 父节点累计改变版本
createTxn.getParentCVersion(),
// zxid
header.getZxid(),
// 时间
header.getTime(),
null);
break;
case OpCode.create2:
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(
create2Txn.getPath(),
create2Txn.getData(),
create2Txn.getAcl(),
create2Txn.getEphemeral() ? header.getClientId() : 0,
create2Txn.getParentCVersion(),
header.getZxid(),
header.getTime(), stat);
// 获取节点状态
rc.stat = stat;
break;
case OpCode.createTTL:
CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
rc.path = createTtlTxn.getPath();
stat = new Stat();
createNode(
createTtlTxn.getPath(),
createTtlTxn.getData(),
createTtlTxn.getAcl(),
EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
createTtlTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
// 获取节点状态
rc.stat = stat;
break;
case OpCode.createContainer:
CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
rc.path = createContainerTxn.getPath();
stat = new Stat();
createNode(
createContainerTxn.getPath(),
createContainerTxn.getData(),
createContainerTxn.getAcl(),
EphemeralType.CONTAINER_EPHEMERAL_OWNER,
createContainerTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.delete:
case OpCode.deleteContainer:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.reconfig:
case OpCode.setdata:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(
setDataTxn.getPath(),
setDataTxn.getData(),
setDataTxn.getVersion(),
header.getZxid(),
header.getTime());
break;
case OpCode.setACL:
SetACLTxn setACLTxn = (SetACLTxn) txn;
rc.path = setACLTxn.getPath();
rc.stat = setACL(
setACLTxn.getPath(),
setACLTxn.getAcl(),
setACLTxn.getVersion());
break;
case OpCode.closeSession:
long sessionId = header.getClientId();
if (txn != null)
{
killSession(
sessionId,
header.getZxid(),
ephemerals.remove(sessionId),
((CloseSessionTxn) txn).getPaths2Delete());
}
else
{
killSession(sessionId, header.getZxid());
}
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
// 错误码
rc.err = errTxn.getErr();
break;
case OpCode.check:
// 检查
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
rc.path = checkTxn.getPath();
break;
case OpCode.multi:
// 复合 *** 作
// 复合 *** 作的事务体
MultiTxn multiTxn = (MultiTxn) txn;
// 取出复合的多个Txn
List txns = multiTxn.getTxns();
// 动态构造用于存储多个处理结果的容器
rc.multiResult = new ArrayList();
boolean failed = false;
// 依次处理每个子事务
for (Txn subtxn : txns)
{
// 一旦有一个子事务之前已经判定为失败
if (subtxn.getType() == OpCode.error)
{
// 整体失败,后续不处理
failed = true;
break;
}
}
boolean post_failed = false;
// 对每个子事务依次处理
for (Txn subtxn : txns)
{
// 字节流
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
// 子事务类型
switch (subtxn.getType())
{
// 创建
case OpCode.create:
// 动态构造
record = new CreateTxn();
break;
case OpCode.createTTL:
record = new CreateTTLTxn();
break;
case OpCode.createContainer:
record = new CreateContainerTxn();
break;
case OpCode.delete:
case OpCode.deleteContainer:
record = new DeleteTxn();
break;
case OpCode.setdata:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
}
assert (record != null);
// 反向序列化得到请求实体
ByteBufferInputStream.byteBuffer2Record(bb, record);
// 如果某个子事务判断为失败
// 且 当前子事务非失败
if (failed && subtxn.getType() != OpCode.error)
{
// 对此子事务,OK
int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();
// 设置子事务类型为失败
subtxn.setType(OpCode.error);
// 动态构造ErrorTxn
record = new ErrorTxn(ec);
}
assert !failed || (subtxn.getType() == OpCode.error);
// 动态构造事务头
TxnHeader subHdr = new TxnHeader(
// 客户端id
header.getClientId(),
// cxid
header.getCxid(),
// zxid
header.getZxid(),
// 时间
header.getTime(),
// 类型
subtxn.getType());
// 对子事务进行处理---若存在某个子事务为失败,所有其他子事务的事务体会设置为ErrorTxn
ProcessTxnResult subRc = processTxn(subHdr, record, true);
// 加入处理结果
rc.multiResult.add(subRc);
// 子事务结果为失败
// 整体结果不为失败
if (subRc.err != 0 && rc.err == 0)
{
// 设置整体结果为失败的那个子事务的结果
rc.err = subRc.err;
}
}
break;
}
}
catch (KeeperException e)
{
LOG.debug("Failed: {}:{}", header, txn, e);
rc.err = e.code().intValue();
}
catch (IOException e)
{
LOG.debug("Failed: {}:{}", header, txn, e);
}
// 类型为创建
// 失败原因为节点不存在
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue())
{
LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
int lastSlash = rc.path.lastIndexOf('/');
String parentName = rc.path.substring(0, lastSlash);
CreateTxn cTxn = (CreateTxn) txn;
try
{
setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
}
catch (KeeperException.NonodeException e)
{
LOG.error("Failed to set parent cversion for: {}", parentName, e);
rc.err = e.code().intValue();
}
}
else if (rc.err != Code.OK.intValue())
{
LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
}
if (!isSubTxn)
{
if (rc.zxid > lastProcessedZxid)
{
lastProcessedZxid = rc.zxid;
}
if (digestFromLoadedSnapshot != null)
{
compareSnapshotDigests(rc.zxid);
}
else
{
logZxidDigest(rc.zxid, getTreeDigest());
}
}
return rc;
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)