blockingqueue,blockingqueue实现生产者消费者

首页>>技术文档>>产品文档

  

  请求接收

  zookeeper中的ServerCnxn是负责网络通信的主要实现类blockingqueue,该类有两个具体的实现blockingqueue,分别是NIOServerCnxn和NettyServerCnxn,本文以NIOServerCnxn为例入手分析。

  我们从NIOServerCnxn这个类看起,正如描述所说:server会为每一个client创建一个该类的实例,负责处理来自该client的请求:

  

  接收的方法代码如下:

  

  关于SelectionKey

  zookeeper使用NIO作为传输媒介,SelectionKey作为Channel的句柄,此处仅用于感知事件或操作类型。

  readPayload

  这里会出现两种情况:NIOServerCnxn已经完成初始化和尚未初始化(没有初始化也可以读出请求内容)。

  对于未初始化时的请求,则当作创建回话的请求处理。代码如下:

  

  创建会话的请求处理

  对于一个创建会话的请求,服务端会进行如下几个步骤的处理:

  1.反序列化ConnectRequest实体

  2.判断是否是readOnly客户端,如果不是只读客户端,而当前服务器又是只读服务器,则抛出异常

  3.检查客户端携带的zxid,一般情况下,服务端的zxid是一定大于客户端的。所以一旦出现客户端zxid大于服务端的时候,会抛出异常;

  4.确定一个sessionTimeout

  5.如果客户端携带了一个sessionId,重连。否则为客户端创建一个sessionId

  整个流程非常清晰,源码如下

  

blockingqueue,blockingqueue实现生产者消费者

  关于zxid

  ZooKeeper Transaction Id:每次请求对应一个唯一的zxid,如果zxid a < zxid b ,则可以保证a一定发生在b之前。

  创建会话和会话管理

  创建会话需要进行如下几个步骤:

  1.生成一个sessionId

  2.在sessionTracker中注册会话

  3.激活会话

  4.为客户端生成一个session密码,作为客户端请求服务端集群中不同机器的凭证

  源码如下

  

  SessionTracker

  上述过程中,重点在于注册会话和激活会话,这个功能由SessionTracker完成,下面我们来具体分析一下。

  首先,SessionTracker是一个线程,它继承了Thread

  publicclassSessionTrackerImplextendsThreadimplementsSessionTracker

  run方法我们稍后再说,先记住它本身是一个线程就行了。

  第二,SessionTracker内部维护了几个集合,分别是

  

  当一个session生成时,它会被分别放到这三个集合中

  sessionById:key=sessionId,value=SessionImpl

  sessionsWithTimeout:key=sessionId,value=timeout

  这两个集合很好理解,但两者本身是冗余的,冗余则是为了提高查找效率

  sessionSets:这个集合里存放的是所有临近时间将要过期的session集合们

  比如所有t1时间附近即将过期的session集合会被放到同一个sessionSet1中,以key=t1,value=sessionSet1存入sessionSets。

  理解了上述三个集合,SessionTracker的工作原理就非常清晰了。

  生成session:

  首先将生成的session分别放入sessionsById和sessionsWithTimeout集合中,然后执行激活touchSession。

  

  激活session:

  首先通过roundToInterval方法协商出一个近似的过期时间,这可以保证所有拥有近似timeout的session被分到同一个桶中。

  如果不是首次激活,需要把session从旧桶中移除。

  然后再放到新的过期桶中。

  synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.CLIENT_PING_TRACE_MASK,"SessionTrackerImpl --- Touch session: 0x"+ Long.toHexString(sessionId) + " with timeout " + timeout); } SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } long expireTime = roundToInterval(System.currentTimeMillis() + timeout); if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; }

  session失效:

  按照时间索引找到失效桶,然后把里边所有的session做失效处理,并重复执行以上步骤。

  

  创建会话流程结束

  如果是创建会话的请求,流程到这里就全部结束了。NIOServerCnxn完成了自身的初始化,并且为客户端创建了会话。

  客户端的下一次请求会进入readRequest分支

  if(!initialized) { readConnectRequest();} else{ readRequest();}

  事务请求的接收

  下面我们通过一个修改节点的请求为例,说明事务请求的处理流程。

  从ZookeeperServer.processPacket方法看起

  首先,还是先反序列化一个RequestHeader对象(注意这里是RequestHeader,而processConnectRequest则是反序列化一个ConnectRequest),

  然后会根据请求类型不同走向不同的分支(具体类型在ZooDefs.OpCode有定义,在此不一一列举),这里只分析一个普通的更新事务流程。

  请求会进入构造一个普通的Request对象,然后进入submitRequest(Request si)方法。这个方法下边细说,先看processPacket的最后一步:

  cnxn.incrOutstandingRequests(h)

  这个方法的源码如下:

  protected void incrOutstandingRequests(RequestHeader h) { if (h.getXid() >= 0) { synchronized (this) { outstandingRequests++; } synchronized (this.factory) { // check throttling if (zkServer.getInProcess() > outstandingLimit) { if (LOG.isDebugEnabled()) { LOG.debug("Throttling recv " + zkServer.getInProcess()); } disableRecv(); // following lines should not be needed since we are // already reading // } else { // enableRecv(); } } } }

  这个方法的意思是:给“处理中的请求”计数器++,然后判断处理中的请求数量是否达到了配置上限outstandingLimit,

  如果超过了配置阈值,则执行disableRecv();顾名思义,拒绝接收新的请求,具体的处理方式是通过SelectionKey关闭对应的Channel。

  这里顺便说一下,outstandingLimit参数是针对单台服务器而言,设定太大可能会导致内存溢出。

  当一个请求被处理完时,会再次检测这个值,如果满足要求,会执行enableRecv();重新打开通道。

  具体代码在sendResponse(ReplyHeader h, Record r, String tag)方法的最后:

  if (h.getXid() > 0) { synchronized(this){ outstandingRequests--; } // check throttling synchronized (this.factory) { if (zkServer.getInProcess() < outstandingLimit || outstandingRequests < 1) { sk.selector().wakeup(); enableRecv(); } } }

  Leader服务器的处理责任链

  对于不同的Zookeeper服务器角色,请求处理责任链是不一样的,下面以Leader服务器为例分析。

  Leader服务器责任链构造如下:

  @OverrideprotectedvoidsetupRequestProcessors(){ RequestProcessor finalProcessor = newFinalRequestProcessor( this); RequestProcessor toBeAppliedProcessor = newLeader.ToBeAppliedRequestProcessor( finalProcessor, getLeader().toBeApplied); commitProcessor = newCommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = newProposalRequestProcessor( this, commitProcessor); proposalProcessor.initialize(); firstProcessor = newPrepRequestProcessor( this, proposalProcessor); ((PrepRequestProcessor)firstProcessor).start(); }

  在把请求交给责任链之前,Leader本身会做一些处理,具体处理代码在ZookeeperServer.submitRequest(Request si)方法中,这个方法一共干了三件事:

  1.校验服务器是否正常启动

  2.激活session

  3.把请求交给责任链的第一环:firstProcessor,即PrepRequestProcessor。

  代码如下:

  public void submitRequest(Request si) { if (firstProcessor == null) { synchronized (this) { try { while (!running) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Dropping packet at server of type " + si.type); // if invalid packet drop the packet. } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }

  下面来依次分析这些Processor。

  预处理:PrepRequestProcessor

  大部分RequestProcessor的结构和工作方式都很类似,现在这里统一说明一下

  它们都会在内部维护一个待处理请求的队列:

  LinkedBlockingQueue<Request> submittedRequests = newLinkedBlockingQueue<Request>();

  RequestProcessor们本身是一个线程,会不停地从自己的队列里取出待处理的Request,处理,然后交给nextProcessor。

  可以看到,processRequest的方法非常简单,只做了一件事,就是把Request放入队列,然后及时释放线程。

  public void processRequest(Request request) { // request.addRQRec(">prep="+zks.outstandingChanges.size()); submittedRequests.add(request); }

  回到PrepRequestProcessor,看看它的run方法:

  从队列里取出一个Request,交给pRequest方法执行

  @Override public void run() { try { while (true) { Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } pRequest(request); }

  以setDate请求为例,最终会调用pRequest2Txn方法。

  pRequest2Txn方法中干了这么几件事:

  1.session校验;

  2.反序列化SetDataRequest,从而得到节点和路径;

  3.校验节点的权限;

  4.校验版本号,版本号+1;

  5.拷贝出一份目标节点的副本,加入到outstandingChanges和outstandingChangesForPath中。

  caseOpCode.setData: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetDataRequest setDataRequest = (SetDataRequest)record; if(deserialize) ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest); path = setDataRequest.getPath(); nodeRecord = getRecordForPath(path); checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,request.authInfo); version = setDataRequest.getVersion(); intcurrentVersion = nodeRecord.stat.getVersion(); if(version != - 1&& version != currentVersion) { thrownewKeeperException.BadVersionException(path); } version = currentVersion + 1; request.txn = newSetDataTxn(path, setDataRequest.getData(), version); nodeRecord = nodeRecord.duplicate(request.hdr.getZxid()); nodeRecord.stat.setVersion(version); addChangeRecord(nodeRecord); break;

  上述几件是做完以后,作为一个setData类型的请求,PrepRequestProcessor的职责就算完成了(注意此时并没有对目标节点进行真正的修改),接下来的工作会交给ProposalRequestProcessor 处理。

  下面我们分析一下为什么需要把待更改的节点放到outstandingChanges和outstandingChangesForPath中,这需要从事务和回滚说起。

  事务的预处理

  先看一下预处理阶段如何处理事务:

  1.首先取出统一批次的事务;

  2.检查事务涉及修改哪些节点,把这些节点备份出来,回滚时使用(HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest););

  3.将事务中的Request取出来依次执行预处理,当某一个Request预处理失败时,后边的Request都不处理了;

  4.如果预处理失败,将之前执行预处理成功的Request进行回滚。

  源码如下:

  

  事务回滚

  从备份的节点中找到事务修改的节点们,进行还原。步骤如下:

  1.取得所有已改变的节点集合的最后一个,向前遍历(这个集合里可能会存在非待会滚的事务修改的节点);

  2.如果待修改节点的事务编号等于待会滚事务的编号,将该节点直接从集合中移除;

  3.遇到第一个非本事务修改的节点时,跳出遍历;

  4.如果待修改节点的集合不为空(说明有其他修改节点的请求),并且备份的节点事务编号>待修改节点的事务编号,使用备份进行还原。

  源码如下:

  

  为了满足以上第4条,需要在备份的时候备份到最新的节点事务编号。

  保证的方式很简单,直接从outstandingChanges里取最后一个就可以了(这就是为什么一个普通修改节点的请求,也会把待修改的节点放到outstandingChanges中)。

  最后来看一下备份的代码:

  /** * Grab current pending change records for each op in a multi-op. * * This is used inside MultiOp error code path to rollback in the event * of a failed multi-op. * * @parammultiRequest */HashMap<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest){ HashMap<String, ChangeRecord> pendingChangeRecords = newHashMap<String, ChangeRecord>(); for(Op op: multiRequest) { String path = op.getPath(); try{ ChangeRecord cr = getRecordForPath(path); if(cr != null) { pendingChangeRecords.put(path, cr); } } catch(KeeperException.NoNodeException e) { // ignore this one} } returnpendingChangeRecords; } ChangeRecord getRecordForPath(String path)throwsKeeperException.NoNodeException { ChangeRecord lastChange = null; synchronized(zks.outstandingChanges) { lastChange = zks.outstandingChangesForPath.get(path); /* for (int i = 0; i < zks.outstandingChanges.size(); i++) { ChangeRecord c = zks.outstandingChanges.get(i); if (c.path.equals(path)) { lastChange = c; } } */if(lastChange == null) { DataNode n = zks.getZKDatabase().getNode(path); if(n != null) { Long acl; Set<String> children; synchronized(n) { acl = n.acl; children = n.getChildren(); } lastChange = newChangeRecord(- 1, path, n.stat, children != null? children.size() : 0, zks.getZKDatabase().convertLong(acl)); } } } if(lastChange == null|| lastChange.stat == null) { thrownewKeeperException.NoNodeException(path); } returnlastChange; }

  ProposalRequestProcessor

  ProposalRequestProcessor的功能相对简单。

  它会先将Request交给下一个Processor,然后在做自己的事,包括:

blockingqueue,blockingqueue实现生产者消费者

  1.将请求封装成一个Proposal(提案),发送给所有的Follower;

  2.调用SyncRequestProcessor记录日志和快照。

  代码如下:

  public void processRequest(Request request) throws RequestProcessorException { // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " + // request.type + " id = " + request.sessionId); // request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower * handler adds it to syncHandler. Otherwise, if it is a client of * the leader that issued the sync command, then syncHandler won't * contain the handler. In this case, we add it to syncHandler, and * call processRequest on the next processor. */ if(request instanceof LearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } else { nextProcessor.processRequest(request); if (request.hdr != null) { // We need to sync and get consensus on any transactions try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); } } }

  SyncRequestProcessor

  SyncRequestProcessor和PrepRequestProcessor类似,也是线程+队列的处理方式,这里就不赘述了。

  为了便于理解,在介绍这个类之前先简单补充一下:

  Zookeeper的数据存储方式

  Zookeeper在启动时会把数据加载到内存数据库中,并通过2种方式向磁盘刷写数据:

  1.当某一事物提交时,会记录一条log,写入到对应的log文件中。(log文件是多个,并且以该log中记录的第一条事务id命名,比如第1条log和第99条log都会记录在log.1文件中,第100条log会记录在log.100文件中);

  2.当1重复进行一定次数后,服务器会把内存数据库的数据全量dump到磁盘上,这一过程称为快照(snapshot);

  3.服务器最多保存snapCount条未进行快照的log,每当接到请求时,logCount++,如果logCount>(snapCount / 2 + randRoll),进行快照。这说明当log累积超过配置数量的一半时,才有可能进行快照;

  4.随机数randRoll保证了所有server不会在同一时间进行快照操作。

  BlockingQueue的take()和poll()的区别

  使用take()函数,如果队列中没有数据,则线程wait释放CPU,而poll()则不会等待,直接返回null

  SyncRequestProcessor的工作流程

  线程的run方法简要流程如下:

  1.生成一个随机数;

  2.如果待写入队列(toFlush)是空的(说明当前写请求较少,线程比较闲),尝试从待处理队列中获取一个Request(如果没有获取到,说明线程太闲了,直接wait挂起);

  3.如果待写入队列不为空,但是待处理队列是空,则处理写入;

  4.如果待写入队列是空的,待处理队列不是空的(成功获取到了一个Request),并且是写请求,记录一条log(注意,此时只是appeng,还没有commit);

  zks.getZKDatabase().append(si)

  5.如果满足了快照条件,记录快照,同时清空等待快照的log数量;

  6.如果是读请求,直接放给下一个Processor处理;

  7.把Request加入到toFlush队列,如果toFlush.size() > 1000,执行flish。

  代码如下:

  

  flush

  正式提交的flush方法流程:

  1.如果队列为空,返回;

  2.将之前append的log正式commit(没错,只是log);

  3.从队列中移除Request,并交给下一个Processor处理(CommitRequestProcessor)。

  代码如下:

  private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException { if (toFlush.isEmpty()) return; zks.getZKDatabase().commit(); while (!toFlush.isEmpty()) { Request i = toFlush.remove(); nextProcessor.processRequest(i); } if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } }

  CommitProcessor

  在ProposalRequestProcessor中,广播的Proposal被Follower服务器收到后,会给Leader服务器一个反馈(ack)。

  CommitProcessor中有两个队列,分别是Leader服务器自己从上游Processor传来的Request,和Follower服务器发来的通过提案的Request。

  /** * Requests that we are holding until the commit comes in. */LinkedList<Request> queuedRequests = newLinkedList<Request>(); /** * Requests that have been committed. */LinkedList<Request> committedRequests = newLinkedList<Request>();

  CommitProcessor的功能就是将这两个队列中的Request进行配对(相当于投票通过的Request),然后交给下一个Processor处理。

  

  Leader.ToBeAppliedRequestProcessor

  Request走到这里,说明Leader服务器和Follower服务器已经接受了这个提案,但是并没有正式生效。

  ToBeAppliedRequestProcessor是提案正式生效前的最后一个Processor,如果不是,构造函数会抛出异常。

  /** * This request processor simply maintains the toBeApplied list. For * this to work next must be a FinalRequestProcessor and * FinalRequestProcessor.processRequest MUST process the request * synchronously! * * @paramnext * a reference to the FinalRequestProcessor */ToBeAppliedRequestProcessor(RequestProcessor next, ConcurrentLinkedQueue<Proposal> toBeApplied) { if(!(next instanceofFinalRequestProcessor)) { thrownewRuntimeException(ToBeAppliedRequestProcessor.class .getName() + " must be connected to "+ FinalRequestProcessor.class.getName() + " not "+ next.getClass().getName()); } this.toBeApplied = toBeApplied; this.next = next; }

  ToBeAppliedRequestProcessor的工作也非常简单:在正式生效之前,leader还需要最后校验一遍Request的顺序,防止顺序错乱。

  public void processRequest(Request request) throws RequestProcessorException { // request.addRQRec(">tobe"); next.processRequest(request); Proposal p = toBeApplied.peek(); if (p != null && p.request != null && p.request.zxid == request.zxid) { toBeApplied.remove(); } }

  之后,请求就会走到最后一个Processor,正式生效。

  FinalRequestProcessor

  FinalRequestProcessor是责任链的最后一个Processor,结构很简单,没有队列和内部线程,只有一个长长的processRequest方法。

  下面就来进行分段分析。

  先看这段同步代码块:

  1.从头开始遍历outstandingChanges队列,移除所有事务id小于当前请求的事务id(这些是已经处理过的过期的ChangeRecord);

  2.从Request中获取请求头和事务,处理事务(下边详细说);

  3.如果是一个事务请求,执行addCommittedProposal(添加一个已提交的提案)方法(下边详细说)。

  synchronized(zks.outstandingChanges) { while(!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get( 0).zxid <= request.zxid) { ChangeRecord cr = zks.outstandingChanges.remove( 0); if(cr.zxid < request.zxid) { LOG.warn( "Zxid outstanding "+ cr.zxid + " is less than current "+ request.zxid); } if(zks.outstandingChangesForPath.get(cr.path) == cr) { zks.outstandingChangesForPath.remove(cr.path); } } if(request.hdr != null) { TxnHeader hdr = request.hdr; Record txn = request.txn; rc = zks.processTxn(hdr, txn); } // do not add non quorum packets to the queue.if(Request.isQuorum(request.type)) { zks.getZKDatabase().addCommittedProposal(request); } }

  DataTree.processTxn

  zks.processTxn(hdr, txn);最终会调用到DataTree.processTxn()方法,过程很简单:

  1.更改节点;

  2.更新最新事务id。

  这里就不贴代码了

  addCommittedProposal

  这个方法主要是向committedLog里插入一条操作日志。

  committedLog是一个存在于内存中的事务日志队列:

  protectedLinkedList<Proposal> committedLog = newLinkedList<Proposal>();

  存入过程很简单,这里就不分析了,需要说明一点的是:如果超过的队列限制,会移除第一个,然后追加在最后。

  具体代码如下:

  public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.removeFirst(); minCommittedLog = committedLog.getFirst().packet.getZxid(); } if (committedLog.size() == 0) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.hdr.serialize(boa, "hdr"); if (request.txn != null) { request.txn.serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.error("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; committedLog.add(p); maxCommittedLog = p.packet.getZxid(); } finally { wl.unlock(); } }

  SyncRequestProcessor的log和FinalRequestProcessor的log的区别在哪里

  这是一个很容易弄混的问题,在这里特别说明一下:

  SyncRequestProcessor的log是一个FileTxnSnapLog,这个log用于定期向磁盘刷写数据。

  这个log队列记录的事务编号一般来说会大于快照记录的事务编号,所以在服务器启动时,需要从快照和这个log两部分读取恢复数据。

  FinalRequestProcessor的log是一个LinkedList<Proposal>,它只存在于内存中,不会向磁盘刷写数据。并且它有容量限制,每次插入最新的,移除最老的(被移除的就真的被移除了,并不会像FileTxnSnapLog一样写入到磁盘上)。

  它的作用是当一个事务被通过时,Zookeeper集群里的其他Follower学习时,从这里获取数据。

  返回响应

  最后,服务器会更新一下处理时间,状态,然后通过ServerCnxn.sendResponse方法将响应发送给请求方。

  至此,一次事务请求的处理流程结束。

  long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); zks.serverStats().updateLatency(request.createTime); cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, System.currentTimeMillis()); try { cnxn.sendResponse(hdr, rsp, "response"); if (closeSession) { cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG",e); }

上一篇: 2024年将是最冷的一年,2024年将是最冷的一年湖北

下一篇: 对象存储设置日志存储的简单介绍