近来看了些58的一个沈剑的架构文章,觉得很是一般,就是把网络上常见的一些模型,自己写一些小例子,博取一些还对网络不了解的人的赞赏。其实文章没有交代出什么东西来,支付宝那么大的架构肯定不是分几个库就分出来的。其中的中间件肯定不会告诉你,那都是集结了许多人的智慧才开发出来的。你比如无论是梁飞开发的dubbo,还是毕玄-林昊开发的HSF,除了这两个知名的人,后面还有一个开发小组的。我们要想进阶,只有不断阅读优秀的代码,和含有先进思想的代码才会写出高质量的代码,这东西就像憋尿一样,水喝多了,尿自然就有了。

这里通读一下hadoop的源码,首先今天的这个类是Server类,因为这里涉及到了通信,序列化,线程操作,一致性,服务端最重要的一些特点在这个类都展现的淋漓尽致。所以我决定从这个类开始看。Server类里有几个重要的模块,一个是Responder,一个是Listener,还有一个是Connection,分别对应着返回线程,监听客户端连接线程,还有就是连接,着三个内部类基本贯穿了他们之间的操作。Listener中又有一个内部类Reader用于读取连接中传来的内容。

  /** Listens on the socket. Creates jobs for the handler threads*/

  private class Listener extends Thread {

    

//这个就是nio操作中的频道,并且选择器用来监听这个频道中传来的事件,如果有读取操作

//的键值,他就从这个事件中读取相应的数据

    private ServerSocketChannel acceptChannel = null; //the accept channel

    private Selector selector = null; //the selector that we use for the server

    private Reader[] readers = null;

    private int currentReader = 0;

    private InetSocketAddress address; //the address we bind at

    private int backlogLength = conf.getInt(

        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,

        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);

    

    public Listener() throws IOException {

      address = new InetSocketAddress(bindAddress, port);

      // Create a new server socket and set to non blocking mode

      acceptChannel = ServerSocketChannel.open();

//设置为非阻塞 no-blocking

      acceptChannel.configureBlocking(false);


//最常见的监听端口操作

      // Bind the server socket to the local host and port

      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);

      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port

      // create a selector;

      selector= Selector.open();

      readers = new Reader[readThreads];

      for (int i = 0; i < readThreads; i++) {

        Reader reader = new Reader(

            "Socket Reader #" + (i + 1) + " for port " + port);

        readers[i] = reader;

        reader.start();

      }


      // Register accepts on the server socket with the selector.

      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);

      this.setName("IPC Server listener on " + port);

      this.setDaemon(true);

    }

    

    private class Reader extends Thread {

      final private BlockingQueue<Connection> pendingConnections;

      private final Selector readSelector;


      Reader(String name) throws IOException {

        super(name);


        this.pendingConnections =

            new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);

        this.readSelector = Selector.open();

      }

      

      @Override

      public void run() {

//Reader线程,一直循环着读取

        LOG.info("Starting " + Thread.currentThread().getName());

        try {

          doRunLoop();

        } finally {

          try {

            readSelector.close();

          } catch (IOException ioe) {

            LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);

          }

        }

      }


      private synchronized void doRunLoop() {

        while (running) {

          SelectionKey key = null;

          try {

            // consume as many connections as currently queued to avoid

            // unbridled acceptance of connections that starves the select

//避免无尽的连接,只读取当前数量的连接

            int size = pendingConnections.size();

            for (int i=size; i>0; i--) {

              Connection conn = pendingConnections.take();

              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);

            }

            readSelector.select();


            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();

            while (iter.hasNext()) {

              key = iter.next();

              iter.remove();

              if (key.isValid()) {

                if (key.isReadable()) {

                  doRead(key);

                }

              }

              key = null;

            }

          } catch (InterruptedException e) {

            if (running) {                      // unexpected -- log it

              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);

            }

          } catch (IOException ex) {

            LOG.error("Error in Reader", ex);

          }

        }

      }


      /**

       * Updating the readSelector while it's being used is not thread-safe,

       * so the connection must be queued.  The reader will drain the queue

       * and update its readSelector before performing the next select

       */

      public void addConnection(Connection conn) throws InterruptedException {

        pendingConnections.put(conn);

        readSelector.wakeup();

      }


      void shutdown() {

        assert !running;

        readSelector.wakeup();

        try {

          join();

        } catch (InterruptedException ie) {

          Thread.currentThread().interrupt();

        }

      }

    }


    @Override

    public void run() {

      LOG.info(Thread.currentThread().getName() + ": starting");

      SERVER.set(Server.this);

      connectionManager.startIdleScan();

      while (running) {

        SelectionKey key = null;

        try {

          getSelector().select();

          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();

          while (iter.hasNext()) {

            key = iter.next();

            iter.remove();

            try {

              if (key.isValid()) {

                if (key.isAcceptable())

//这里是Listener这个线程的主方法,就是不断读取当前频道内的selectedKey,

如果有连接过来的连接,则接受这个连接。

                  doAccept(key);

              }

            } catch (IOException e) {

            }

            key = null;

          }

        } catch (OutOfMemoryError e) {

          // we can run out of memory if we have too many threads

          // log the event and sleep for a minute and give 

          // some thread(s) a chance to finish

          LOG.warn("Out of Memory in server select", e);

          closeCurrentConnection(key, e);

          connectionManager.closeIdle(true);

          try { Thread.sleep(60000); } catch (Exception ie) {}

        } catch (Exception e) {

          closeCurrentConnection(key, e);

        }

      }

      LOG.info("Stopping " + Thread.currentThread().getName());


      synchronized (this) {

        try {

          acceptChannel.close();

          selector.close();

        } catch (IOException e) { }


        selector= null;

        acceptChannel= null;

        

        // close all connections

        connectionManager.stopIdleScan();

        connectionManager.closeAll();

      }

    }


    private void closeCurrentConnection(SelectionKey key, Throwable e) {

      if (key != null) {

        Connection c = (Connection)key.attachment();

        if (c != null) {

          closeConnection(c);

          c = null;

        }

      }

    }


    InetSocketAddress getAddress() {

      return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();

    }

    

//Listener的主方法其实是是,监听线程主要在处理连接

    void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {

      ServerSocketChannel server = (ServerSocketChannel) key.channel();

      SocketChannel channel;

      while ((channel = server.accept()) != null) {


        channel.configureBlocking(false);

        channel.socket().setTcpNoDelay(tcpNoDelay);

        channel.socket().setKeepAlive(true);

        

        Reader reader = getReader();

        Connection c = connectionManager.register(channel);

        key.attach(c);  // so closeCurrentConnection can get the object

        reader.addConnection(c);

      }

    }


    void doRead(SelectionKey key) throws InterruptedException {

      int count = 0;

      Connection c = (Connection)key.attachment();

      if (c == null) {

        return;  

      }

      c.setLastContact(Time.now());

      

      try {

//读-处理这个方法在内部类connection中实现的,下面就会看到。

        count = c.readAndProcess();

      } catch (InterruptedException ieo) {

        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);

        throw ieo;

      } catch (Exception e) {

        // a WrappedRpcServerException is an exception that has been sent

        // to the client, so the stacktrace is unnecessary; any other

        // exceptions are unexpected internal server errors and thus the

        // stacktrace should be logged

        LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +

            c.getHostAddress() + " threw exception [" + e + "]",

            (e instanceof WrappedRpcServerException) ? null : e);

        count = -1; //so that the (count < 0) block is executed

      }

      if (count < 0) {

        closeConnection(c);

        c = null;

      }

      else {

        c.setLastContact(Time.now());

      }

    }   


    synchronized void doStop() {

      if (selector != null) {

        selector.wakeup();

        Thread.yield();

      }

      if (acceptChannel != null) {

        try {

          acceptChannel.socket().close();

        } catch (IOException e) {

          LOG.info(Thread.currentThread().getName() + ":Exception in closing listener socket. " + e);

        }

      }

      for (Reader r : readers) {

        r.shutdown();

      }

    }

    

    synchronized Selector getSelector() { return selector; }

    // The method that will return the next reader to work with

    // Simplistic implementation of round robin for now

    Reader getReader() {

      currentReader = (currentReader + 1) % readers.length;

      return readers[currentReader];

    }

  }

读取管道内的数据,就是在内部类connection中实现的,下面阅读一下connection类。


  /** Reads calls from a connection and queues them for handling. */

  public class Connection {

    private boolean connectionHeaderRead = false; // connection  header is read?

    private boolean connectionContextRead = false; //if connection context that

                                            //follows connection header is read


    private SocketChannel channel;

    private ByteBuffer data;

    private ByteBuffer dataLengthBuffer;

//call也是一个内部类,用于调取rpc

    private LinkedList<Call> responseQueue;

    private volatile int rpcCount = 0; // number of outstanding rpcs

    private long lastContact;

    private int dataLength;

    private Socket socket;

    // Cache the remote host & port info so that even if the socket is 

    // disconnected, we can say where it used to connect to.

    private String hostAddress;

    private int remotePort;

    private InetAddress addr;

    //ipc连接(inter-process-communication协议,用protobuf生成的类

    IpcConnectionContextProto connectionContext;

    String protocolName;

    SaslServer saslServer;

    private AuthMethod authMethod;

    private AuthProtocol authProtocol;

//安全sasl连接

    private boolean saslContextEstablished;

    private ByteBuffer connectionHeaderBuf = null;

    private ByteBuffer unwrappedData;

    private ByteBuffer unwrappedDataLengthBuffer;

    private int serviceClass;

    

    UserGroupInformation user = null;

    public UserGroupInformation attemptingUser = null; // user name before auth


    // Fake 'call' for failed authorization response

    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,

        RpcConstants.INVALID_RETRY_COUNT, null, this);

    private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();

    

    private final Call saslCall = new Call(AuthProtocol.SASL.callId,

        RpcConstants.INVALID_RETRY_COUNT, null, this);

    private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();

    

    private boolean sentNegotiate = false;

    private boolean useWrap = false;

    //与监听中的ServerSocketChannel是两个类

    public Connection(SocketChannel channel, long lastContact) {

      this.channel = channel;

      this.lastContact = lastContact;

      this.data = null;

      this.dataLengthBuffer = ByteBuffer.allocate(4);

      this.unwrappedData = null;

      this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);

      this.socket = channel.socket();

      this.addr = socket.getInetAddress();

      if (addr == null) {

        this.hostAddress = "*Unknown*";

      } else {

        this.hostAddress = addr.getHostAddress();

      }

      this.remotePort = socket.getPort();

      this.responseQueue = new LinkedList<Call>();

      if (socketSendBufferSize != 0) {

        try {

          socket.setSendBufferSize(socketSendBufferSize);

        } catch (IOException e) {

          LOG.warn("Connection: unable to set socket send buffer size to " +

                   socketSendBufferSize);

        }

      }

    }   


    @Override

    public String toString() {

      return getHostAddress() + ":" + remotePort; 

    }

    

    public String getHostAddress() {

      return hostAddress;

    }


    public InetAddress getHostInetAddress() {

      return addr;

    }

    

    public void setLastContact(long lastContact) {

      this.lastContact = lastContact;

    }


    public long getLastContact() {

      return lastContact;

    }


//如果没有明显的rpc,返回true,rpcCount为0

    /* Return true if the connection has no outstanding rpc */

    private boolean isIdle() {

      return rpcCount == 0;

    }

    

    /* Decrement the outstanding RPC count */

    private void decRpcCount() {

      rpcCount--;

    }

    

    /* Increment the outstanding RPC count */

    private void incRpcCount() {

      rpcCount++;

    }

    

    private UserGroupInformation getAuthorizedUgi(String authorizedId)

        throws InvalidToken, AccessControlException {

      if (authMethod == AuthMethod.TOKEN) {

        TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,

            secretManager);

        UserGroupInformation ugi = tokenId.getUser();

        if (ugi == null) {

          throw new AccessControlException(

              "Can't retrieve username from tokenIdentifier.");

        }

        ugi.addTokenIdentifier(tokenId);

        return ugi;

      } else {

        return UserGroupInformation.createRemoteUser(authorizedId, authMethod);

      }

    }


    private void saslReadAndProcess(DataInputStream dis) throws

    WrappedRpcServerException, IOException, InterruptedException {

      final RpcSaslProto saslMessage =

          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);

      switch (saslMessage.getState()) {

        case WRAP: {

          if (!saslContextEstablished || !useWrap) {

            throw new WrappedRpcServerException(

                RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

                new SaslException("Server is not wrapping data"));

          }

          // loops over decoded data and calls processOneRpc

          unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());

          break;

        }

        default:

          saslProcess(saslMessage);

      }

    }


    private Throwable getCauseForInvalidToken(IOException e) {

      Throwable cause = e;

      while (cause != null) {

        if (cause instanceof RetriableException) {

          return cause;

        } else if (cause instanceof StandbyException) {

          return cause;

        } else if (cause instanceof InvalidToken) {

//hadoop的方法限制sasl的回调给返回无效的token,但是有些服务需要抛出其他异常

          // FIXME: hadoop method signatures are restricting the SASL

          // callbacks to only returning InvalidToken, but some services

          // need to throw other exceptions (ex. NN + StandyException),

          // so for now we'll tunnel the real exceptions via an

          // InvalidToken's cause which normally is not set 

          if (cause.getCause() != null) {

            cause = cause.getCause();

          }

          return cause;

        }

        cause = cause.getCause();

      }

      return e;

    }

    

    private void saslProcess(RpcSaslProto saslMessage)

        throws WrappedRpcServerException, IOException, InterruptedException {

      if (saslContextEstablished) {

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

            new SaslException("Negotiation is already complete"));

      }

      RpcSaslProto saslResponse = null;

      try {

        try {

          saslResponse = processSaslMessage(saslMessage);

        } catch (IOException e) {

          rpcMetrics.incrAuthenticationFailures();

          // attempting user could be null

          AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"

              + attemptingUser + " (" + e.getLocalizedMessage() + ")");

          throw (IOException) getCauseForInvalidToken(e);

        }

        

        if (saslServer != null && saslServer.isComplete()) {

          if (LOG.isDebugEnabled()) {

            LOG.debug("SASL server context established. Negotiated QoP is "

                + saslServer.getNegotiatedProperty(Sasl.QOP));

          }

          user = getAuthorizedUgi(saslServer.getAuthorizationID());

          if (LOG.isDebugEnabled()) {

            LOG.debug("SASL server successfully authenticated client: " + user);

          }

          rpcMetrics.incrAuthenticationSuccesses();

          AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);

          saslContextEstablished = true;

        }

      } catch (WrappedRpcServerException wrse) { // don't re-wrap

        throw wrse;

      } catch (IOException ioe) {

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);

      }

      // send back response if any, may throw IOException

      if (saslResponse != null) {

        doSaslReply(saslResponse);

      }

      // do NOT enable wrapping until the last auth response is sent

      if (saslContextEstablished) {

        String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);

        // SASL wrapping is only used if the connection has a QOP, and

        // the value is not auth.  ex. auth-int & auth-priv

        useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));        

      }

    }

    

    private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)

        throws IOException, InterruptedException {

      final RpcSaslProto saslResponse;

      final SaslState state = saslMessage.getState(); // required      

      switch (state) {

        case NEGOTIATE: {

          if (sentNegotiate) {

            throw new AccessControlException(

                "Client already attempted negotiation");

          }

          saslResponse = buildSaslNegotiateResponse();

          // simple-only server negotiate response is success which client

          // interprets as switch to simple

          if (saslResponse.getState() == SaslState.SUCCESS) {

            switchToSimple();

          }

          break;

        }

        case INITIATE: {

          if (saslMessage.getAuthsCount() != 1) {

            throw new SaslException("Client mechanism is malformed");

          }

          // verify the client requested an advertised authType

          SaslAuth clientSaslAuth = saslMessage.getAuths(0);

          if (!negotiateResponse.getAuthsList().contains(clientSaslAuth)) {

            if (sentNegotiate) {

              throw new AccessControlException(

                  clientSaslAuth.getMethod() + " authentication is not enabled."

                      + "  Available:" + enabledAuthMethods);

            }

            saslResponse = buildSaslNegotiateResponse();

            break;

          }

          authMethod = AuthMethod.valueOf(clientSaslAuth.getMethod());

          // abort SASL for SIMPLE auth, server has already ensured that

          // SIMPLE is a legit option above.  we will send no response

          if (authMethod == AuthMethod.SIMPLE) {

            switchToSimple();

            saslResponse = null;

            break;

          }

          // sasl server for tokens may already be instantiated

          if (saslServer == null || authMethod != AuthMethod.TOKEN) {

            saslServer = createSaslServer(authMethod);

          }

          saslResponse = processSaslToken(saslMessage);

          break;

        }

        case RESPONSE: {

          saslResponse = processSaslToken(saslMessage);

          break;

        }

        default:

          throw new SaslException("Client sent unsupported state " + state);

      }

      return saslResponse;

    }


    private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)

        throws SaslException {

      if (!saslMessage.hasToken()) {

        throw new SaslException("Client did not send a token");

      }

      byte[] saslToken = saslMessage.getToken().toByteArray();

      if (LOG.isDebugEnabled()) {

        LOG.debug("Have read input token of size " + saslToken.length

            + " for processing by saslServer.evaluateResponse()");

      }

      saslToken = saslServer.evaluateResponse(saslToken);

      return buildSaslResponse(

          saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,

          saslToken);

    }


    private void switchToSimple() {

      // disable SASL and blank out any SASL server

      authProtocol = AuthProtocol.NONE;

      saslServer = null;

    }

    

    private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {

      if (LOG.isDebugEnabled()) {

        LOG.debug("Will send " + state + " token of size "

            + ((replyToken != null) ? replyToken.length : null)

            + " from saslServer.");

      }

      RpcSaslProto.Builder response = RpcSaslProto.newBuilder();

      response.setState(state);

      if (replyToken != null) {

        response.setToken(ByteString.copyFrom(replyToken));

      }

      return response.build();

    }

    

    private void doSaslReply(Message message) throws IOException {

      if (LOG.isDebugEnabled()) {

        LOG.debug("Sending sasl message "+message);

      }

      setupResponse(saslResponse, saslCall,

          RpcStatusProto.SUCCESS, null,

          new RpcResponseWrapper(message), null, null);

      responder.doRespond(saslCall);

    }

    

    private void doSaslReply(Exception ioe) throws IOException {

      setupResponse(authFailedResponse, authFailedCall,

          RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,

          null, ioe.getClass().getName(), ioe.getLocalizedMessage());

      responder.doRespond(authFailedCall);

    }

    

    private void disposeSasl() {

      if (saslServer != null) {

        try {

          saslServer.dispose();

        } catch (SaslException ignored) {

        }

      }

    }


    private void checkDataLength(int dataLength) throws IOException {

      if (dataLength < 0) {

        String error = "Unexpected data length " + dataLength +

                       "!! from " + getHostAddress();

        LOG.warn(error);

        throw new IOException(error);

      } else if (dataLength > maxDataLength) {

        String error = "Requested data length " + dataLength +

              " is longer than maximum configured RPC length " + 

            maxDataLength + ".  RPC came from " + getHostAddress();

        LOG.warn(error);

        throw new IOException(error);

      }

    }


    public int readAndProcess()

        throws WrappedRpcServerException, IOException, InterruptedException {

      while (true) {

        /* Read at most one RPC. If the header is not read completely yet

         * then iterate until we read first RPC or until there is no data left.

        最多读一个rpc,如果头部没有被读完整,那么迭代直到读到第一个rpc,或者没有数据剩下 

        */    

        int count = -1;

        if (dataLengthBuffer.remaining() > 0) {

//从管道里读取

          count = channelRead(channel, dataLengthBuffer);       

          if (count < 0 || dataLengthBuffer.remaining() > 0) 

            return count;

        }

        

        if (!connectionHeaderRead) {

          //Every connection is expected to send the header.

          if (connectionHeaderBuf == null) {

            connectionHeaderBuf = ByteBuffer.allocate(3);

          }

          count = channelRead(channel, connectionHeaderBuf);

          if (count < 0 || connectionHeaderBuf.remaining() > 0) {

            return count;

          }

          int version = connectionHeaderBuf.get(0);

          // TODO we should add handler for service class later

          this.setServiceClass(connectionHeaderBuf.get(1));

          dataLengthBuffer.flip();

          

          // Check if it looks like the user is hitting an IPC port

          // with an HTTP GET - this is a common error, so we can

          // send back a simple string indicating as much.

          if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {

            setupHttpRequestOnIpcPortResponse();

            return -1;

          }

          

          if (!RpcConstants.HEADER.equals(dataLengthBuffer)

              || version != CURRENT_VERSION) {

            //Warning is ok since this is not supposed to happen.

            LOG.warn("Incorrect header or version mismatch from " + 

                     hostAddress + ":" + remotePort +

                     " got version " + version + 

                     " expected version " + CURRENT_VERSION);

            setupBadVersionResponse(version);

            return -1;

          }

          

          // this may switch us into SIMPLE

          authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));          

          

          dataLengthBuffer.clear();

          connectionHeaderBuf = null;

          connectionHeaderRead = true;

          continue;

        }

        

        if (data == null) {

          dataLengthBuffer.flip();

          dataLength = dataLengthBuffer.getInt();

          checkDataLength(dataLength);

          data = ByteBuffer.allocate(dataLength);

        }

        //从频道中读取数据的函数

        count = channelRead(channel, data);

        

        if (data.remaining() == 0) {

          dataLengthBuffer.clear();

          data.flip();

          boolean isHeaderRead = connectionContextRead;

          processOneRpc(data.array());

          data = null;

          if (!isHeaderRead) {

            continue;

          }

        } 

        return count;

      }

    }

//这里面定义了几个认证协议

    private AuthProtocol initializeAuthContext(int authType)

        throws IOException {

      AuthProtocol authProtocol = AuthProtocol.valueOf(authType);

      if (authProtocol == null) {

        IOException ioe = new IpcException("Unknown auth protocol:" + authType);

        doSaslReply(ioe);

        throw ioe;        

      }

      boolean isSimpleEnabled = enabledAuthMethods.contains(AuthMethod.SIMPLE);

      switch (authProtocol) {

        case NONE: {

          // don't reply if client is simple and server is insecure

          if (!isSimpleEnabled) {

            IOException ioe = new AccessControlException(

                "SIMPLE authentication is not enabled."

                    + "  Available:" + enabledAuthMethods);

            doSaslReply(ioe);

            throw ioe;

          }

          break;

        }

        default: {

          break;

        }

      }

      return authProtocol;

    }


    private RpcSaslProto buildSaslNegotiateResponse()

        throws IOException, InterruptedException {

      RpcSaslProto negotiateMessage = negotiateResponse;

      // accelerate token negotiation by sending initial challenge

      // in the negotiation response

      if (enabledAuthMethods.contains(AuthMethod.TOKEN)) {

        saslServer = createSaslServer(AuthMethod.TOKEN);

        byte[] challenge = saslServer.evaluateResponse(new byte[0]);

        RpcSaslProto.Builder negotiateBuilder =

            RpcSaslProto.newBuilder(negotiateResponse);

        negotiateBuilder.getAuthsBuilder(0)  // TOKEN is always first

            .setChallenge(ByteString.copyFrom(challenge));

        negotiateMessage = negotiateBuilder.build();

      }

      sentNegotiate = true;

      return negotiateMessage;

    }

    

    private SaslServer createSaslServer(AuthMethod authMethod)

        throws IOException, InterruptedException {

      final Map<String,?> saslProps =

                  saslPropsResolver.getServerProperties(addr);

      return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);

    }

    

    /**

     * Try to set up the response to indicate that the client version

     * is incompatible with the server. This can contain special-case

     * code to speak enough of past IPC protocols to pass back

     * an exception to the caller.

     * @param clientVersion the version the caller is using 

     * @throws IOException

     */

//设置去告诉客户端版本是否与服务端兼容

    private void setupBadVersionResponse(int clientVersion) throws IOException {

      String errMsg = "Server IPC version " + CURRENT_VERSION +

      " cannot communicate with client version " + clientVersion;

      ByteArrayOutputStream buffer = new ByteArrayOutputStream();

      

      if (clientVersion >= 9) {

        // Versions >>9  understand the normal response

        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,

            this);

        setupResponse(buffer, fakeCall, 

            RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,

            null, VersionMismatch.class.getName(), errMsg);

        responder.doRespond(fakeCall);

      } else if (clientVersion >= 3) {

        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,

            this);

        // Versions 3 to 8 use older response

        setupResponseOldVersionFatal(buffer, fakeCall,

            null, VersionMismatch.class.getName(), errMsg);


        responder.doRespond(fakeCall);

      } else if (clientVersion == 2) { // Hadoop 0.18.3

        Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,

            this);

        DataOutputStream out = new DataOutputStream(buffer);

        out.writeInt(0); // call ID

        out.writeBoolean(true); // error

        WritableUtils.writeString(out, VersionMismatch.class.getName());

        WritableUtils.writeString(out, errMsg);

        fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));

        

        responder.doRespond(fakeCall);

      }

    }

    

    private void setupHttpRequestOnIpcPortResponse() throws IOException {

      Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);

      fakeCall.setResponse(ByteBuffer.wrap(

          RECEIVED_HTTP_REQ_RESPONSE.getBytes()));

      responder.doRespond(fakeCall);

    }


    /** Reads the connection context following the connection header

     * @param dis - DataInputStream from which to read the header 

     * @throws WrappedRpcServerException - if the header cannot be

     *         deserialized, or the user is not authorized

     */ 

//这个函数是读取连接中头部下面的内容上下文,从DataInputStream中读取

    private void processConnectionContext(DataInputStream dis)

        throws WrappedRpcServerException {

      // allow only one connection context during a session

      if (connectionContextRead) {

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

            "Connection context already processed");

      }

//从流中解码protobuf协议的内容上下文

      connectionContext = decodeProtobufFromStream(

          IpcConnectionContextProto.newBuilder(), dis);

      protocolName = connectionContext.hasProtocol() ? connectionContext

          .getProtocol() : null;

//得到用户组,接下来认证是否有权限

      UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);

      if (saslServer == null) {

        user = protocolUser;

      } else {

        // user is authenticated

        user.setAuthenticationMethod(authMethod);

        //Now we check if this is a proxy user case. If the protocol user is

        //different from the 'user', it is a proxy user scenario. However, 

        //this is not allowed if user authenticated with DIGEST.

        if ((protocolUser != null)

            && (!protocolUser.getUserName().equals(user.getUserName()))) {

          if (authMethod == AuthMethod.TOKEN) {

            // Not allowed to doAs if token authentication is used

            throw new WrappedRpcServerException(

                RpcErrorCodeProto.FATAL_UNAUTHORIZED,

                new AccessControlException("Authenticated user (" + user

                    + ") doesn't match what the client claims to be ("

                    + protocolUser + ")"));

          } else {

            // Effective user can be different from authenticated user

            // for simple auth or kerberos auth

            // The user is the real user. Now we create a proxy user

            UserGroupInformation realUser = user;

            user = UserGroupInformation.createProxyUser(protocolUser

                .getUserName(), realUser);

          }

        }

      }

      authorizeConnection();

      // don't set until after authz because connection isn't established

      connectionContextRead = true;

    }

    

    /**

     * Process a wrapped RPC Request - unwrap the SASL packet and process

     * each embedded RPC request 

     * @param buf - SASL wrapped request of one or more RPCs

     * @throws IOException - SASL packet cannot be unwrapped

     * @throws InterruptedException

     */    

    private void unwrapPacketAndProcessRpcs(byte[] inBuf)

        throws WrappedRpcServerException, IOException, InterruptedException {

      if (LOG.isDebugEnabled()) {

        LOG.debug("Have read input token of size " + inBuf.length

            + " for processing by saslServer.unwrap()");

      }

      inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);

      ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(

          inBuf));

      // Read all RPCs contained in the inBuf, even partial ones

      while (true) {

        int count = -1;

        if (unwrappedDataLengthBuffer.remaining() > 0) {

          count = channelRead(ch, unwrappedDataLengthBuffer);

          if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)

            return;

        }


        if (unwrappedData == null) {

          unwrappedDataLengthBuffer.flip();

          int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();

          unwrappedData = ByteBuffer.allocate(unwrappedDataLength);

        }


        count = channelRead(ch, unwrappedData);

        if (count <= 0 || unwrappedData.remaining() > 0)

          return;


        if (unwrappedData.remaining() == 0) {

          unwrappedDataLengthBuffer.clear();

          unwrappedData.flip();

          processOneRpc(unwrappedData.array());

          unwrappedData = null;

        }

      }

    }

    

    /**

     * Process an RPC Request - handle connection setup and decoding of

     * request into a Call

     * @param buf - contains the RPC request header and the rpc request

     * @throws IOException - internal error that should not be returned to

     *         client, typically failure to respond to client

     * @throws WrappedRpcServerException - an exception to be sent back to

     *         the client that does not require verbose logging by the

     *         Listener thread

     * @throws InterruptedException

     */    

//如理rpc请求

    private void processOneRpc(byte[] buf)

        throws IOException, WrappedRpcServerException, InterruptedException {

      int callId = -1;

      int retry = RpcConstants.INVALID_RETRY_COUNT;

      try {

        final DataInputStream dis =

            new DataInputStream(new ByteArrayInputStream(buf));

        final RpcRequestHeaderProto header =

            decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);

        callId = header.getCallId();

        retry = header.getRetryCount();

        if (LOG.isDebugEnabled()) {

          LOG.debug(" got #" + callId);

        }

        checkRpcHeaders(header);

        

        if (callId < 0) { // callIds typically used during connection setup

          processRpcOutOfBandRequest(header, dis);

        } else if (!connectionContextRead) {

          throw new WrappedRpcServerException(

              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

              "Connection context not established");

        } else {

          processRpcRequest(header, dis);

        }

      } catch (WrappedRpcServerException wrse) { // inform client of error

        Throwable ioe = wrse.getCause();

        final Call call = new Call(callId, retry, null, this);

        setupResponse(authFailedResponse, call,

            RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,

            ioe.getClass().getName(), ioe.getMessage());

        responder.doRespond(call);

        throw wrse;

      }

    }


    /**

     * Verify RPC header is valid

     * @param header - RPC request header

     * @throws WrappedRpcServerException - header contains invalid values 

     */

//头部要有几个部分,rpc操作,rpc种类

    private void checkRpcHeaders(RpcRequestHeaderProto header)

        throws WrappedRpcServerException {

      if (!header.hasRpcOp()) {

        String err = " IPC Server: No rpc op in rpcRequestHeader";

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);

      }

      if (header.getRpcOp() != 

          RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {

        String err = "IPC Server does not implement rpc header operation" + 

                header.getRpcOp();

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);

      }

      // If we know the rpc kind, get its class so that we can deserialize

      // (Note it would make more sense to have the handler deserialize but 

      // we continue with this original design.

      if (!header.hasRpcKind()) {

        String err = " IPC Server: No rpc kind in rpcRequestHeader";

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);

      }

    }


    /**

     * Process an RPC Request - the connection headers and context must

     * have been already read

     * @param header - RPC request header

     * @param dis - stream to request payload

     * @throws WrappedRpcServerException - due to fatal rpc layer issues such

     *   as invalid header or deserialization error. In this case a RPC fatal

     *   status response will later be sent back to client.

     * @throws InterruptedException

     */

//处理rpc请求

    private void processRpcRequest(RpcRequestHeaderProto header,

        DataInputStream dis) throws WrappedRpcServerException,

        InterruptedException {

      Class<? extends Writable> rpcRequestClass = 

          getRpcRequestWrapper(header.getRpcKind());

      if (rpcRequestClass == null) {

        LOG.warn("Unknown rpc kind "  + header.getRpcKind() + 

            " from client " + getHostAddress());

        final String err = "Unknown rpc kind in rpc header"  + 

            header.getRpcKind();

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);   

      }

//writable接口是hadoop中所有可序列化实体中需要实现的

      Writable rpcRequest;

      try { 

//Read the rpc request

        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);

        rpcRequest.readFields(dis);

      } catch (Throwable t) { 

// includes runtime exception from newInstance

        LOG.warn("Unable to read call parameters for client " +

                 getHostAddress() + "on connection protocol " +

            this.protocolName + " for rpcKind " + header.getRpcKind(),  t);

        String err = "IPC server unable to read call parameters: "+ t.getMessage();

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);

      }

        

      Call call = new Call(header.getCallId(), header.getRetryCount(),

          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header

              .getClientId().toByteArray());

      callQueue.put(call);              // queue the call; maybe blocked here

      incRpcCount();  // Increment the rpc count

    }



    /**

     * Establish RPC connection setup by negotiating SASL if required, then

     * reading and authorizing the connection header

     * @param header - RPC header

     * @param dis - stream to request payload

     * @throws WrappedRpcServerException - setup failed due to SASL

     *         negotiation failure, premature or invalid connection context,

     *         or other state errors 

     * @throws IOException - failed to send a response back to the client

     * @throws InterruptedException

     */

//简历rpc连接,被sasl协议搭建的,然后读取认证连接头

    private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,

        DataInputStream dis) throws WrappedRpcServerException, IOException,

        InterruptedException {

      final int callId = header.getCallId();

      if (callId == CONNECTION_CONTEXT_CALL_ID) {

        // SASL must be established prior to connection context

        if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {

          throw new WrappedRpcServerException(

              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

              "Connection header sent during SASL negotiation");

        }

        // read and authorize the user

//读取认证用户

        processConnectionContext(dis);

      } else if (callId == AuthProtocol.SASL.callId) {

        // if client was switched to simple, ignore first SASL message

        if (authProtocol != AuthProtocol.SASL) {

          throw new WrappedRpcServerException(

              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

              "SASL protocol not requested by client");

        }

        saslReadAndProcess(dis);

      } else if (callId == PING_CALL_ID) {

        LOG.debug("Received ping message");

      } else {

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,

            "Unknown out of band call #" + callId);

      }

    }    


    /**

     * Authorize proxy users to access this server

     * @throws WrappedRpcServerException - user is not allowed to proxy

     */

    private void authorizeConnection() throws WrappedRpcServerException {

      try {

        // If auth method is TOKEN, the token was obtained by the

        // real user for the effective user, therefore not required to

        // authorize real user. doAs is allowed only for simple or kerberos

        // authentication

//如果认证方法是token,这个token要被一个真实有效地的用户获取

        if (user != null && user.getRealUser() != null

            && (authMethod != AuthMethod.TOKEN)) {

          ProxyUsers.authorize(user, this.getHostAddress());

        }

        authorize(user, protocolName, getHostInetAddress());

        if (LOG.isDebugEnabled()) {

          LOG.debug("Successfully authorized " + connectionContext);

        }

        rpcMetrics.incrAuthorizationSuccesses();

      } catch (AuthorizationException ae) {

        LOG.info("Connection from " + this

            + " for protocol " + connectionContext.getProtocol()

            + " is unauthorized for user " + user);

        rpcMetrics.incrAuthorizationFailures();

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);

      }

    }

    

    /**

     * Decode the a protobuf from the given input stream 

     * @param builder - Builder of the protobuf to decode

     * @param dis - DataInputStream to read the protobuf

     * @return Message - decoded protobuf

     * @throws WrappedRpcServerException - deserialization failed

     */

    @SuppressWarnings("unchecked")

//解码流

    private <T extends Message> T decodeProtobufFromStream(Builder builder,

        DataInputStream dis) throws WrappedRpcServerException {

      try {

        builder.mergeDelimitedFrom(dis);

        return (T)builder.build();

      } catch (Exception ioe) {

        Class<?> protoClass = builder.getDefaultInstanceForType().getClass();

        throw new WrappedRpcServerException(

            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,

            "Error decoding " + protoClass.getSimpleName() + ": "+ ioe);

      }

    }


    /**

     * Get service class for connection

     * @return the serviceClass

     */

    public int getServiceClass() {

      return serviceClass;

    }


    /**

     * Set service class for connection

     * @param serviceClass the serviceClass to set

     */

    public void setServiceClass(int serviceClass) {

      this.serviceClass = serviceClass;

    }

//同步关闭方法

    private synchronized void close() {

      disposeSasl();

      data = null;

      dataLengthBuffer = null;

      if (!channel.isOpen())

        return;

      try {socket.shutdownOutput();} catch(Exception e) {

        LOG.debug("Ignoring socket shutdown exception", e);

      }

      if (channel.isOpen()) {

        try {channel.close();} catch(Exception e) {}

      }

      try {socket.close();} catch(Exception e) {}

    }

  }

这里面的序列化用了protobuf,还有一些具体的头,rpc体的设置。