I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0

UNIX網絡編程5種I/O模型

  • I/O 復用模型(最大的優勢是多路復用)
    Linux提供select/poll,進程通過將一個或多個fd傳遞給select或poll系統調用,阻塞在select操作上,這樣select/poll可以幫我們偵測多個fd是否處于就緒狀態。select/poll是順序掃描fd是否就緒,而且支持的fd數量有限,因此它的使用受到了一些制約。Linux還提供了一個epoll系統調用,epoll使用基于事件驅動方式代替順序掃描,因此性能更高。當有fd就緒時,立即回調函數rollback
  • I/O 多路復用技術
    • I/O 多路復用技術通過把多個I/O 的阻塞復用到同一個select的阻塞上,從而使得系統在單線程的情況下可以同時處理多個客戶端請求
    • 目前支持I/O多路復用的系統調用有select、pselect、poll、epoll,在 Linux網絡編程過程中,很長一段時間都使用select做輪詢和網絡事件通知,然而select的些固有缺陷導致了它的應用受到了很大的限制,最終Linux不得不在新的內核版本中尋找select的替代方案,最終選擇了epoll
      • 支持一個進程打開的socket描述符(FD ) 不受限制(僅受限于操作系統的最大文 件句柄數)
        • select最大的缺陷就是單個進程所打開的FD是有一定限制的,它由FD_SETSIZE設 置,默認值是1024,選擇修改這個宏需要重新編譯內核且網絡效率會下降
        • cat /proc/sys/fs/file- max
      • I/O 效率不會隨著FD數目的增加而線性下降
        • 由于網絡延時或者鏈路空閑,任一時刻只有少部分的socket是 “活躍”的,但是select/poll每次調用都會線性掃描全部的集合,導致效率呈現線性下降。epoll不存在這個問題,它只會對“活躍”的socket進行操作
      • 使用mmap加速內核與用戶空間的消息傳遞
        • 無論是select、poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必 要的內存復制就顯得非常重要,epoll是通過內核和用戶空間mmap同一塊內存來實現的
        • mmap-map files or devices into memory
      • epoll的API更加簡單
    • 用來克服select/poll缺點的方法不只有epoll, epoll只是一種Linux的實現方案。在 freeBSD下有kqueue
  • 從5種I/O模型來看,其實都涉及到兩個階段
    • 等待數據準備就緒
    • 數據從內核復制到用戶空間
      • 對于阻塞io,調用recvfrom,阻塞直到第二個階段完成或者錯誤才返回
      • 對于非阻塞io,調用recvfrom,如果緩沖區沒有數據則直接返回錯誤,一般都對非阻塞I/O 模型進行輪詢檢査這個狀態,看內核是不是有數據到來;數據準備后,第二個階段也是阻塞的
      • 對于I/O復用模型,第一個階段進程阻塞在select調用,等待1個或多個套接字(多路)變為可讀,而第二個階段是阻塞的
        • 這里進程是被select阻塞但不是被socket io阻塞
        • java nio實現
          • 是否阻塞configureBlocking(boolean block)
          • selector事件到來時(只是判斷是否可讀/可寫)->具體的讀寫還是由阻塞和非阻塞決定->如阻塞模式下,如果輸入流不足r字節則進入阻塞狀態,而非阻塞模式下則奉行能讀到多少就讀到多少的原則->立即返回->
          • 同理寫也是一樣->selector只是通知可寫->但是能寫多少數據也是有阻塞和非阻塞決定->如阻塞模式->如果底層網絡的輸出緩沖區不能容納r個字節則會進入阻塞狀態->而非阻塞模式下->奉行能輸出多少就輸出多少的原則->立即返回
          • 對于accept->阻塞模式->沒有client連接時,線程會一直阻塞下去->而非阻塞時->沒有客戶端連接->方法立刻返回null->
      • 對于信號驅動I/O模型,應用進程建立SIGIO信號處理程序后立即返回,非阻塞,數據準備就緒時,生成SIGIO信號并通過信號回調應用程序通過recvfrom來讀取數據,第二個階段也是阻塞的
      • 而對于異步I/O模型來說,第二個階段的時候內核已經通知我們數據復制完成了
  • Java NIO的核心類庫多路復用器Selector就是基于epoll的多路復用技術實現
    • Enhancements in JDK 6 Release
      • A new java.nio.channels.SelectorProvider implementation that is based on the Linux epoll event notification facility is included. The epoll facility is available in the Linux 2.6, and newer, kernels. The new epoll-based SelectorProvider implementation is more scalable than the traditional poll-based SelectorProvider implementation when there are thousands of SelectableChannels registered with a Selector. The new SelectorProvider implementation will be used by default when the 2.6 kernel is detected. The poll-based SelectorProvider will be used when a pre-2.6 kernel is detected.
        • 即JDK6版本中默認的SelectorProvider即為epoll(Linux 2.6 kernal)
    • macosx-sun.nio.ch.KQueueSelectorProvider
    • solaris-sun.nio.ch.DevPollSelectorProvider
    • linux
      • 2.6以上版本-sun.nio.ch.EPollSelectorProvider
      • 以下版本-sun.nio.ch.PollSelectorProvider
    • windows-sun.nio.ch.WindowsSelectorProvider
    • Oracle jdk會自動選擇合適的Selector,如果想設置特定的Selector
      • -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
  • Netty Native transports
    • Since 4.0.16, Netty provides the native socket transport for Linux using JNI. This transport has higher performance and produces less garbage
    • Netty's epoll transport uses epoll edge-triggered while java's nio library uses level-triggered. Beside this the epoll transport expose configuration options that are not present with java's nio like TCPCORK, SOREUSEADDR and more.
      • 即Netty的Linux原生傳輸層使用了epoll邊緣觸發
      • 而jdk的nio類庫使用的是epoll水平觸發
    • epoll ET(Edge Triggered) vs LT(Level Triggered)
      • 簡單來說就是當邊緣觸發時,只有 fd 變成可讀或可寫的那一瞬間才會返回事件。當水平觸發時,只要 fd 可讀或可寫,一直都會返回事件
      • 簡單地說,如果你有數據過來了,不去取LT會一直騷擾你,提醒你去取,而ET就告訴你一次,愛取不取,除非有新數據到來,否則不再提醒
      • Nginx大部分event采用epoll EPOLLET(邊沿觸發)的方法來觸發事件,只有listen端口的讀事件是EPOLLLT(水平觸發).對于邊沿觸發,如果出現了可讀事件,必須及時處理,否則可能會出現讀事件不再觸發,連接餓死的情況
  • Java7 NIO2
    • implemented using IOCP on Windows
      • WindowsAsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
      • 即nio2在windows的底層實現是iocp
    • Linux using epoll
      • UnixAsynchronousSocketChannelImpl implements Port.PollableChannel
      • 即nio2在linux 2.6后的底層實現還是epoll
        • 通過epoll模擬異步
        • 個人認為也許linux內核本身的aio實現方案其實并不是很完善,或多或少有這樣或者那樣的問題,即使用了aio,也沒有明顯的性能優勢
          • Not faster than NIO (epoll) on unix systems (which is true)
  • Reactor/Proactor

    • 兩種IO設計模式
    • Reactor-Dispatcher/Notifier
      • Don't call us, we'll call you
    • Proactor-異步io
    • Reactor通過某種變形,可以將其改裝為Proactor,在某些不支持異步I/O的系統上,也可以隱藏底層的實現,利于編寫跨平臺代碼
  • 參考

Java I/O類庫的發展和改進

  • BIO(blocking)
    • 采用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理,處理完成之后,通過輸出流返回應答給客戶端,線程銷毀
      • 該模型最大的問題就是缺乏彈性伸縮能力,當客戶端并發訪問量增加后,服務端的線程個數和客戶端并發訪問數呈1: 1 的正比關系,由于線程是Java虛擬機非常寶貴的系統資源,當線程數膨脹之后,系統的性能將急劇下降,隨著并發訪問量的繼續增大,系統會發生線程堆棧溢出、創建新線程失敗等問題,并最終導致進程宕機或者值死,不能對外提供服務
      • ServerSocket/Socket/輸入輸出流(阻塞)
  • 偽異步I/O
    • 為了改進一線程一連接模型,后來又演進出了一種通過線程池或者消息隊列實現1個或者多個線程處理N個客戶端的模型
    • 采用線程池和任務隊列可以實現一種叫做偽異步的I/O通信框架
    • 當有新的客戶端接入時,將客戶端的Socket封裝成一個Task,投遞到后端的線程池中進行處理,JDK的線程池維護一個消息隊列和N個活躍線程,對消息隊列中的任務進行處理
      • 當對方發送請求或者應答消息比較緩慢,或者網絡傳輸較慢時,讀取輸入流一方的通信線程將被長時間阻塞,如果對方要60s 才能夠將數據發送完成,讀取一方的I/O線程也將會被同步阻塞60s, 在此期間,其他接入消息只能在消息隊列中排隊
      • 當消息的接收方處理緩慢的時候,將不能及時地從TCP緩沖區讀取數據,這將會導致發送方的TCP window size( 滑動窗口)不斷減小,直到為0,雙方處于Keep-Alive狀態,消息發送方將不能再向TCP緩沖區寫入消息
  • NIO
    • SocketChannel和ServerSocketChannel,支持阻塞和非阻塞兩種模式
    • Buffer/Channel/Selector(多路復用器,可同時輪詢多個Channel)
      • java.nio.ByteBuffer的幾個常用方法
        • flip、clear、compact、mark、rewind、hasRemaining、isDirect等
    • 客戶端發起的連接操作是異步的
    • SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的數據它不會同步等待,直接返回,這樣I/O 通信線程就可以處理其他的鏈路,不需要同步等待這個鏈路可用
    • JDK的 Selector在 Linux等主流操作系統上通過epoll實現,它沒有連接句柄數的限制
  • AIO
    • AsynchronousServerSocketChannel、AsynchronousSocketChannel
    • CompletionHandler<V,A>
      • V The result type of the I/O operation
      • A The type of the object attached to the I/O operation
    • 既然已經接收客戶端成功了,為什么還要再次調用accept方法呢?原因是這樣的:調用AsynchronousServerSocketChannel的accept方法后,如果有新的客戶端連接接入,系統將回調我們傳入的CompletionHandler實例的completed方法,表示新的客戶端已經接入成功。因為一個AsynchronousServerSocketChannel可以接收成千上萬個客戶端,所以需要繼續調用它的accept方法,接收其他的客戶端連接,最終形成一個循環。每當接收一個客戶讀連接成功之后,再異步接收新的客戶端連接
  • 不選擇Java原生NIO編程的原因
    • N10的類庫和API繁雜,使用麻煩
    • 需要具備其他的額外技能做鋪墊,例如熟悉Java多線程
    • 可靠性能力補齊,工作景和難度都非常大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等問題
    • JDK NIO 的 BUG, 例如臭名昭著的epollbug, 它會導致Selector空輪詢,最終導致CPU100%
  • 為什么選擇Netty
    • 健壯性、功能、性能、可定制性和可擴展性在同類框架中都是首屈一指的,它已經得到成百上千的商用項目驗證
    • API使用簡單
    • 預置了多種編解碼功能,支持多種主流協議
    • 可以通過ChannelHand丨er對通信框架進行靈活地擴展
    • 性能高
    • Netty修復了己經發現的所有JDKNIO BUG
    • 社區活躍,版本迭代周期短
    • 經歷了大規模的商業應用考驗,質量得到驗證

Netty 入門

  • ServerBootstrap、EventLoopGroup(boss)、EventLoopGroup(worker)、NioServerSocketChannel、ChannelOption、ChannelInitializer、ChannelPipeline、ChannelFuture、ChannelHandlerAdapter、ChannelHandlerContext
  • Bootstrap、NioSocketChannel
  • try/finally、shutdownGracefully(boss、worker)
  • ChannelHandlerContext的 flush方法,它的作用是將消息發送隊列中的消息寫入SocketChannel中發送給對方.從性能角度考慮,為了防止頻繁地喚醒Selector進行消息發送,Netty的 write方法并不直接將消息寫入SocketChannel中,調用write方法只是把待發送的消息放到發送緩沖數組中,再通過調用flush方法,將發送緩沖區中的消息全部寫到SocketChannel中
  • 基于Netty開發的都是非Web的Java應用,它的打包形態非常簡單,就是一個普通的.jar 包,通常可以使用Eclipse、Ant、Ivy、Gradle等進行構建

TCP 粘包/拆包問題的解決之道

  • TCP是個“流”協議,所謂流,就是沒有界限的一串數據。大家可以想想河里的流水,它們是連成一片的,其間并沒有分界線。TCP底層并不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的劃分,所以在業務上認為 , 一個完 整的包可能會被 TCP拆分成多個包進行發送,也有可能把多個小的包封裝成個大的數據包發送,這就是所謂的TCP粘包和拆包問題。
  • 由于底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決
    • 消息定長,例如每個報文的大小為固定長度200字節,如果不夠,空位補空格
    • 在包尾增加回車換行符進行分割,例如FTP協議
    • 將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總長度
  • 沒有考慮讀半包問題,這在功能測試時往往沒有問題,但是一旦壓力上來,或者發送大報文之后,就會存在粘包/拆包問題,如循環發送100條消息,則可能會出現TCP粘包
  • 為了解決TCP粘包/拆包導致的半包讀寫問題,Netty默認提供了多種編解碼器用于處理半包
    • LineBasedFrameDecoder
      • A decoder that splits the received {@link ByteBuf}s on line endings
    • StringDecoder
      • A decoder that splits the received {@link ByteBuf}s on line endings

分隔符和定長解碼器的應用

  • DelimiterBasedFrameDecoder
    • A decoder that splits the received {@link ByteBuf}s by one or more delimiters
  • FixedLengthFrameDecoder
    • A decoder that splits the received {@link ByteBuf}s by the fixed number of bytes

編解碼技術

  • 基于Java提供的對象輸入/輸出流ObjectlnputStream和 ObjectOutputStream,可以直接把Java對象作為可存儲的字節數組寫入文件 ,也可以傳輸到網絡上,Java序列化的目的:
    • 網絡傳輸
    • 對象持久化
  • Java序列化的缺點
    • 無法跨語言
    • 序列化后的碼流太大
      • 對于字符串
        • byte[] value = this.userName.getBytes();
        • buffer.putInt(value.length);
        • buffer.put(value);
    • 序列化性能太低
  • 業界主流的編解碼框架
    • Google的Protobuf
    • Facebook的Thrift
    • JBoss Marshalling
      • JBoss Marshalling是一個Java對象的序列化API包,修正了 JDK自帶的序列化包的很多問題,但又保持跟java.io.Serializable接口的兼容

MessagePack編解碼

  • MessagePack介紹
    • It's like JSON. but fast and small
    • MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. But it's faster and smaller
    • http://msgpack.org/
    • 提供了對多語言的支持
  • API介紹

    // Create serialize objects.
         List<String> src = new ArrayList<String>();
         src. add (,,msgpackw);
         src.add("kumofs");
         src.add("viver">;
         MessagePack msgpack = new MessagePack();
         // Serialize
         byte[] raw = msgpack.write(src);
         // Deserialize directly using a template
         L±8t<String> dstl = msgpack. read (raw, Ten 5 >lates . tList (Ten^>lates. TString));
         
  • MessagePack編碼器和解碼器開發

    • MessageToByteEncoder<I
    • ByteToMessageDecoder、MessageToMessageDecoder<I
    • LengthFieldBasedFrameDecoder extends ByteToMessageDecoder
      • A decoder that splits the received {@link ByteBuf}s dynamically by the value of the length field in the message
      • public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast)
      • 功能很強大,可指定消息長度字段的偏移等,而不僅僅是消息頭的第一個字段就是長度
    • LengthFieldPrepender extends MessageToMessageEncoder
      • An encoder that prepends the length of the message.
      • 可自動前面加上消息長度字段

Google Protobuf 編解碼

  • 主要使用了netty默認提供的關于protobuf的編解碼器
    • ProtobufVarint32FrameDecoder extends ByteToMessageDecoder
      • A decoder that splits the received {@link ByteBuf}s dynamically by the value of the Google Protocol Buffers
    • ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder
      • An encoder that prepends the the Google Protocol Buffers
    • ProtobufEncoder extends MessageToMessageEncoder
      • Encodes the requested Google Protocol Buffers Message And MessageLite into a {@link ByteBuf}
    • ProtobufDecoder extends MessageToMessageDecoder
      • Decodes a received {@link ByteBuf} into a Google Protocol Buffers Message And MessageLite
      • 注意其構造函數要傳一個MessageLite對象,即協議類型,用來反序列化

 BEFORE DECODE (302 bytes) AFTER DECODE (300 bytes)
+--------+---------------+ +---------------+
| Length | Protobuf Data |----->| Protobuf Data |
| 0xAC02 | (300 bytes) | | (300 bytes) |
+--------+---------------+ +---------------+
BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
+---------------+ +--------+---------------+
| Protobuf Data |-------------->| Length | Protobuf Data |
| (300 bytes) | | 0xAC02 | (300 bytes) |
+---------------+ +--------+---------------+
  • Protobuf的使用注意事項
    • ProtobufDecoder僅僅負責解碼,它不支持讀半包。因此,在 ProtobufDecoder前面, 一定要有能夠處理讀半包的解碼器
      • 使用Netty提供的ProtobufVarint32FrameDecoder,它可以處理半包消息
      • 繼承Netty提供的通用半包解碼器LengthFieldBasedFrameDecoder
      • 繼承ByteToMessageDecoder類,自己處理半包消息

JBoss Marshalling 編解碼

  • JBoss的Marshalling完全兼容JDK序列化
  • MarshallingDecoder extends LengthFieldBasedFrameDecoder
    • Decoder which MUST be used with {@link MarshallingEncoder}
    • 需要傳入UnmarshallerProvider和maxObjectSize
  • MarshallingEncoder extends MessageToByteEncoder
    • {@link MessageToByteEncoder} implementation which uses JBoss Marshalling to marshal an Object
    • 需要傳入MarshallerProvider
  • Netty的Marshalling編解碼器支持半包和粘包的處理,對于開發者而言,只需要正確地將 Marshalling編碼器和解碼器加入到ChannelPipeline 中,就能實現對Marshalling序列化的支持
  • HTTP協議開發應用

    • HTTP請求消息(HttpRequest)
      • HTTP請求行
      • HTTP消息頭
      • HTTP請求正文
    • HTTP響應消息(HttpResponse)
      • 狀態行、消息報頭、響應正文
    • Netty HTTP文件服務器
      • HttpRequestDecoder
      • HttpObjectAggregator
      • HttpResponseEncoder
      • ChunkedWriteHandler、ChunkedFile
      • FullHttpRequest、FullHttpResponse、DefaultFullHttpResponse
    • Netty HTTP+ XML協議棧開發
      • 很多基于HTTP的應用都是后臺應用,HTTP僅僅是承載數據交換的一個通道,是一個載體而不是Web容器
      • JiBX是一款非常優秀的XML (Extensible Markup Language) 數據綁定框架
        • JiBX is a tool for binding XML data to Java objects
        • Unmarshal是將XML文件轉換成Java對象,而 Marshal則是將Java對象編排成規范的XML文件
        • xpp-XML Pull Parsing
      • 過程
        • 構造請求消息HttpXmlRequest(封裝一個FullHttpRequest和一個Object)
        • 定義請求消息編碼器HttpXmlRequestEncoder,對HttpXmlRequest進行編碼
          • 對請求消息中的業務object通過jibx序列化為xml字符串,隨后將它封裝成Netty的 ByteBuf
          • 構造HTTP消息頭-HttpHeaders/DefaultFullHttpRequest
          • 請求消息消息體不為空,也沒有使用Chunk方式,所以在HTTP消息頭中設置消息體的長度Content-Length
          • 后續Netty的 HTTP請求編碼器繼續對HTTP請求消息進行編碼
        • 定義請求消息解碼器HttpXmlRequestDecoder
          • 從HTTP消息體中獲取請求碼流,通過JiBx框架對它進行反序列化(FullHttpRequest#content),得到請求object對象,并封裝為HttpXmlRequest
          • 回調業務handler, 業務得到的就是解碼后的POJO對象和HTTP消息頭
          • 注意-decoder有一個參數是業務obj的clazz對象
        • 同理封裝一個應答消息HttpXmlResponse(封裝一個FullHttpResponse和一個Obect)
        • 定義應答消息編碼器HttpXmlResponseEncoder
          • 同上,對應答消息中的object通過jibx序列化xml字符串并轉為ByteBuf
          • 構造HTTP應答消息FullHttpResponse,這里注意因為 Netty的 DefaultFullHttpResponse沒有提供動態設置消息體content的接口,只能在第一次構造的時候設置內容,同上也需要設置Content-Length,Content-Type為text/xml
        • 定義應答消息解碼器HttpXmlResponseDecoder
          • DefaultFullHttpResponse和HTTP應答消息反序列化后的object對象構造HttpXmlResponse(DefaultFullHttpResponse#content)
      • 流程
        • 客戶端
          • HttpResponseDecoder、HttpObjectAggregator、HttpXmlResponseDecoder(傳入object clazz)、HttpRequestEncoder、HttpXmlRequestEncoder
          • HttpXmlClientHandle
            • channelActive時構造HttpXmlRequest
            • messageReceived中直接得到HttpXmlResponse
        • 服務端
          • HttpRequestDecoder、HttpObjectAggregator、HttpXmlRequestDecoder(傳入object clazz)、HttpResponseEncoder、HttpXmlResponseEncoder
          • HttpXmlServerHandler
            • messageReceived中直接得到HttpXmlRequest進行業務處理,然后發送HttpXmlResponse,如果非keepAlive模式,則發送完畢候關閉鏈接(通過在ChannelFuture加一個Listener監聽)

    WebSocket協議開發

    • WebSocket是 HTML5 開始提供的一種瀏覽器與服務器間進行全雙工通信的網絡技術
    • 在 WebSocketAPI中,瀏覽器和服務器只需要做個握手的動作,然后,瀏覽器和服務器之間就形成了一條快速通道,兩者就可以直接互相傳送數據了。WebSocket基于TCP雙向全雙工進行消息傳遞,在同一時刻,既可以發送消息,也可以接收消息,相比HTTP的半雙工協議,性能得到很大提升
    • WebSocket設計出來的目的就是要取代輪詢和Comet技術,使客戶端瀏覽器具備像 C/S 架構下桌面系統一樣的實時通信能力
    • 為了建立一個WebSocket連接,客戶端瀏覽器首先要向服務器發起一個HTTP請求,這個請求和通常的HTTP請求不同.包含了一些附加頭信息,其中附加頭信息“Upgrade:WebSocket”表明這是個申請協議升級的HTTP請求。服務器端解析這些附加的頭信息,然后生成應答信息返回給客戶端,客戶端和服務器端的WebSocket連接就建立起來了,雙方可以通過這個連接通道自由地傳速信息,并且這個連接會持續存在直到客戶端或者服務器端的某一方主動關閉連接
    • 握手成功之后,服務端和客戶端就可以通過“messages”的方式進行通信了,一個消息由一個或者多個幀組成
    • Netty WebSocket 協議開發
      • Netty內置了WebSocket協議相關的api
      • WebSocketServer
        • HttpServerCodec、HttpObjectAggregator、ChunkedWriteHandler
        • WebSocketServerHandler
          • messageReceived,第一次握手請求消息由HTTP協議承載,所以它是一個HTTP消息,執行handleHttpRequest方法來處理WebSocket握手請求,對握手請求消息進行判斷,如果消息頭中沒有包含Upgrade字段或者它的值不是websocket, 則返回HTTP 400響應
          • 握手請求簡單校驗通過之后,開始構造握手工廠,創建握手處理類WebSocketServerHandshaker, 通過它構造握手響應消息返回給客戶端,同時將WebSocket相關的編碼和解碼類動態添加到ChannelPipeline中,用于WebSocket消息的編解碼(WebSocketServerHandshaker#handshake)
          • 添加WebSocket Encoder和 WebSocket Decoder之后,服務端就可以自動對WebSocket
          • 消息進行編解碼了,后面的業務handler可以直接對WebSocket對象進行操作(WebSocketFrame)
          • 直接對控制幀進行判斷并返回應答消息
      • 而客戶端則是嵌套在html中,由js進行websocket的相關接口開發

    私有協議棧開發

    • 在傳統的Java應用中,通常使用以下4 種方式進行跨節點通信
      • 通過RM1進行遠程服務調用
      • 通過Java的 Socket+Java序列化的方式進行跨節點調用
      • 利用一些開源的RPC框架進行遠程服務調用,例如Facebook的Thrift、 Apache 的Avro等
      • 利用標準的公有協議進行跨節點服務調用,例如HTTP+XML、RESTful+JSON或 者 WebService
    • 跨節點的遠程服務調用,除了鏈路層的物理連接外,還需要對請求和響應消息進行編解碼。在請求和應答消息本身以外,也需要攜帶一些其他控制和管理類指令,例如鏈路建立的握手請求和響應消息、鏈路檢測的心跳消息等。當這些功能組合到一起之后,就會形成私有協議
    • Netty協議棧功能設計
      • Netty協議棧用于內部各模塊之間的通信,它基于TCP/IP協議棧,是一個類HTTP協議的應用層協議棧
      • 在分布式組網環境下,每個Netty節 點 (Netty進程)之間建立長連接,使用Netty協議進行通信。Netty節點并沒有服務端和客戶端的區分,誰首先發起連接,誰就作為客戶端,另一方自然就成為服務端。一個Netty節點既可以作為客戶端連接另外的Netty節點,也可以作為Netty服務端被其他Netty節點連接
    • 協議棧功能描述
      • 承載了業務內部各模塊之間的消息交互和服務調用
      • 基于Netty的NIO通信框架,提供髙性能的異步通信能力
      • 提供消息的編解碼框架,可以實現POJO的序列化和反序列化
      • 提供基于IP地址的白名申.接入認證機制
      • 鏈路的有效性校驗機制
      • 鏈路的斷連重連機制
    • 通信模型
      • Netty協議棧客戶端發送握手請求消息,攜帶節點ID等有效身份認證信息
      • Netty協議棧服務端對握手請求消息進行合法性校驗,包括節點ID有效性校驗、節點重復登錄校驗和IP地址合法性校驗,校驗通過后,返回登錄成功的握手應答消息
      • 鏈路建立成功之后,客戶端發送業務消息
      • 鏈路成功之后,服務端發送心跳消息
      • 鏈路建立成功之后,客戶端發送心跳消息
      • 鏈路建立成功之后,服務端發送業務消息
      • 服務端退出時,服務端關閉連接,客戶端感知對方關閉連接后,被動關閉客戶端連接
      • Netty協議通信雙方鏈路建立成功之后,雙方可以進行全雙工通信,無論客戶端還是服務端,都可以主動發送請求消息給對方,通信方式可以是TWOWAY或者ONE WAY, 雙方之間的心跳采用Ping-Pong機制,當鏈路處于空閑狀態時,客戶端主動發送Ping消息給服務端,服務端接收到Ping消息后發送應答消息Pong給客戶端,如果客戶端連續發送N 條 Ping消息都沒有接收到服務端返回的Pong消息,說明鏈路已經掛死或者對方處于異常狀態,客戶端主動關閉連接,間隔周期T 后發起重連操作,直到重連成功
    • 消息定義
      • 消息頭
        • crcCode int 32位 消息校驗碼 = OxABEF(2字節) + 主版本號(1字節) + 次版本號(1字節)
        • length int 32位 消息長度= 消息頭的長度+消息體長度
        • sessionID long 64位 集群節點全局唯一id
        • type byte 8位 消息類型(包括握手請求、應答、心跳請求、應答等)
        • priority byte 8位 消息優先級
        • attachment Map<String,Object> 變長 可選字段,用于擴展消息頭
      • 消息體
        • Object 變長
    • 鏈路的建立
      • 考慮到安全,鏈路建立需要通過基于IP 地址或者號段的黑白名單安全認證機制,在實際商用項目中,安全認證機制會更加嚴格,如通過密鑰對用戶名和密碼進行安全認證
      • 客戶端與服務端鏈路建立成功之后,由客戶端發送握手請求消息
      • 服務端接收到客戶端的握手請求消息之后,如 果 IP 校驗通過,返回握手成功應答消息給客戶端,應用層鏈路建立成功
    • 鏈路的關閉
      • 當對方宕機或者重啟時,會主動關閉鏈路,另一方讀取到操作系統的通知信號,得知對方REST鏈路,需要關閉連接,釋放自身的句柄等資源
        • proc/sys/net/ipv4/tcp_retries2
      • 消息讀寫過程中,發生了 I/O 異常,需要主動關閉連接
      • 心跳消息讀寫過程中發生了 I/O 異常,需要主動關閉連接
      • 心跳超時,需要主動關閉連接
      • 發生編碼異常等不可恢復錯誤時,需要主動關閉連接
    • 可靠性設計
      • Netty協議棧可能會運行在非常惡劣的網絡環境中,網絡超時、閃斷、對方進程僵死或者處理緩慢等情況都有可能發生
      • 心跳機制
        • 在網絡空閑時采用心跳機制來檢測鏈路的互通性, 一旦發現網絡故障,立即關閉鏈路,主動重連
        • 當網絡處于空閑狀態持續時間達到T(連續周期T沒有讀寫消息)時,客戶端主動發送Ping心跳消息給服務端
        • 如果在下一個周期T到來時客戶端沒有收到對方發送的Pong心眺應答消息或者讀取到服務端發送的其他業務消息,則心跳失敗計數器加1
        • 每當客戶端接收到服務的業務消息或者Pong應答消息時,將心跳失敗計數器清零:連續N次沒有接收到服務端的Pong消息或者業務消息,則關閉鏈路,間隔INTERVAL時間后發起重連操作
        • 服務端網絡空閑狀態持續時間達到T后,服務端將心跳失敗計數器加1;只要接收到客戶端發送的Ping消息或者其他業務消息,計數器清零
        • 服務端連續N次沒有接收到客戶端的Ping消息或者其他業務消息,則關閉鏈路,釋放資源,等待客戶端重連
        • 通過Ping-Pong雙向心跳機制,可以保證無論通信哪一方出現網絡故障,都能被及時地檢測出來。為了防止由于對方短時間內繁忙沒有及時返回應答造成的誤判,只有連續N次心跳檢測都失敗才認定鏈路己經損害,需要關閉鏈路并重建鏈路
        • 當讀或者寫心跳消息發生I/O異常的時候,說明鏈路己經中斷,此時需要立即關閉鏈路,如果是客戶端,需要重新發起連接。如果是服務端,需要清空緩存的半包信息,等待客戶端重連
          • 之前的項目中則直接是客戶端每30s向服務器發送一個ping消息同時服務器返回一個ping消息;如果30s左右(網絡延遲),鏈路空閑則斷開鏈接;偌一方出現宕機則30s后可直接檢測數來--但相對比而言,作者的方法更為嚴謹一些
      • 重連機制
        • 如果鏈路中斷,等待INTERVAL時間后,由客戶端發起重連操作,如果重連失敗,間隔周期INTERVAL后再次發起重連,直到重連成功
        • 為了保證服務端能夠有充足的時間釋放句柄資源,在首次斷連時客戶端需要等待INTERVAL時間之后再發起重連,而不是失敗后就立即重連
        • 為了保證句柄資源能夠及時釋放,無論什么場景下的重連失敗,客戶端都必須保證自身的資源被及時釋放
        • 重連失敗后,需要打印異常堆棧信息,方便后續的問題定位
      • 重復登錄保護
        • 當客戶端握手成功之后,在鏈路處于正常狀態下,不允許客戶端重復登錄,以防止客戶端在異常狀態下反復重連導致句柄資源被耗盡
        • 緩存客戶端的地址列表,通過該列表檢查客戶端是否已登陸
        • 當服務端連續N次心跳超時之后需要主動關閉鏈路,清空該客戶端的地址緩存信息,以保證后續該客戶端可以重連成功,防止被重復登錄保護機制拒絕掉
          • 猜測是可能會出現類似客戶端認為舊鏈接已經logout了,嘗試重新登陸;但是服務器認為舊的鏈路還在(如客戶端宕機)等;所以還是需要心跳機制輔助
      • 消息緩存重發
        • 無論客戶端還是服務端,當發生鏈路中斷之后,在鏈路恢復之前,緩存在消息隊列中待發送的消息不能丟失,等鏈路恢復之后,重新發送這些消息,保證鏈路中斷期間消息不丟失
        • 考慮到內存溢出的風險,建議消息緩存隊列設置上限,當達到上限之后,應該拒絕繼續向該隊列添加新的消息
      • 安全性設計
        • 為了保證整個集群環境的安全,內部長連接采用基于IP 地址的安全認證機制,服務端對握手請求消息的IP 地址進行合法性校驗-白名單
        • 如果將Netty協議棧放到公網中使用,需要采用更加嚴格的安全認證機制,例如基于密鑰和AES加密的用戶名+密碼認證機制,也可以采用SSL/TSL安全傳輸
      • 可擴展性設計
        • 通過Netty消息頭中的可選附件attachment字段,業務可以方便地進行自定義擴展
        • Netty協議棧架構需要具備一定的擴展能力,例如統一的消息攔截、接口日志、安全、加解密等可以被方便地添加和刪除,不需要修改之前的邏輯代碼,類 似 Servlet的 FilterChain和 AOP, 但考慮到性能因素,不推薦通過AOP來實現功能的擴展
          • 直接通過ChannelPipeline即可
    • 開發過程
      • 定義協議消息,NettyMessage,包括消息頭Header和消息體Object
        • 消息頭則包括之前說的crcCode、length等字段
      • 定義編解碼類
        • NettyMessageEncoder extends MessageToByteEncoder
          • 編碼header
          • 對于header中的attachment,則首先寫入map的長度,然后遍歷map,寫key的長度,寫key的bytes(String),最后通過JBoss Marshalling對value進行編碼
            • 注意在作者的這個示例中,自己手動寫的MarshallingEncoder,其實netty已經提供了MarshallingEncoder(參數不同),通過源代碼可以看到,encode核心代碼部分基本相同
          • 編碼消息體
          • 最后將消息長度重寫
        • NettyMessageDecoder extends LengthFieldBasedFrameDecoder
          • 這里用到了Netty的LengthFieldBasedFrameDecoder解碼器,它支持自動的TCP粘包和半包處理,只需要給出標識消息長度的字段偏移量和消息長度自身所占的字節數,netty就能自動實現對半包的處理
          • 解碼過程和編碼過程恰恰相反,這里不再詳述
        • 握手和安全認證
          • LoginAuthReqHandler,在通道激活時發起握手請求
          • 客戶端握手請求發送之后,按照協議規范,服務端需要返回握手應答消息(channelRead中),首先判斷消息是否是握手應答消息,如果不是,直接透傳給后面的 ChannelHandler 進行處理;如果是握手應答消息,則對應答結果進行判斷,如果非0 , 說明認證失敗,關閉鏈路,重新發起連接
          • LoginAuthRespHandler#channelRead,對重復登陸進行判斷,然后白名單判斷,如果均校驗成功則構造握手應答消息
          • 當發生異常關閉鏈路的時候需要將客戶端的信息從登錄注冊表中刪除,以保證后續客戶端可以重連成功
      • 心跳檢測機制
        • 握手成功之后,由客戶端主動發送心跳消息,服務端接收到心跳消息之后,返回心跳應答消息
        • 當握手成功之后,握手請求Handler會繼續將握手成功消息向下透傳至HeartBeatReqHandler,接收到之后對消息進行判斷,如果是握手成功消息,則啟動無限循環 定時器用于定期發送心跳消息。由于NioEventLoop是一個Schedule, 因此它支持定時器的執行,如每5s發送一條心跳消息
        • 而服務器的HeartBeatRespHandler則比較簡單,接收到心跳請求消息之后,構造心跳應答消息返回即可'
        • 心跳超時的實現非常簡單,直接利用Netty的 ReadTimeoutHandler機制,當一定周期內 (默認值50s) 沒有讀取到對方任何消息時,需要主動關閉鏈路。如果是客戶端,重新發起連接;如果是服務端,釋放資源,清除客戶端登錄緩存信息,等待服務端重連
          • 可參考ReadTimeoutHandler#readTimedOut
        • 斷線重連
          • 當客戶端感知斷連事件之后,釋放資源,重新發起連接
          • try塊代碼最后一行是future.channel().closeFuture().sync(),線程等待鏈接關閉
          • 當關閉后執行finally塊代碼,嘗試重連
      • 客戶端代碼
        • NettyMessageDecoder(1024 * 1024, 4, 4)、NettyMessageEncoder、ReadTimeoutHandler(50)、LoginAuthReqHandler、HeartBeatReqHandler
        • 這次我們綁定了本地端口,主要用于服務端重復登錄保護,另外,從產品管理角度看,一般情況下不允許系統隨便使用隨機端口
        • 利用Netty的 ChannelPipeline和 Channe丨Handler機制,可以非常方便地實現功能解耦 和業務產品的定制。例如本例程中的心跳定時器、握手請求和后端的業務處理可以通過不同的Handler來實現,類似于AOP。通過Handler Chain的機制可以方便地實現切面攔截和定制,相比于AOP它的性能更高
      • 服務端代碼
        • NettyMessageDecoder(1024 * 1024, 4, 4)、NettyMessageEncoder、ReadTimeoutHandler(50)、LoginAuthRespHandler、HeartBeatRespHandler
        • 客戶端宕機重啟之后,服務端需要能夠清除緩存信息,允許客戶端重新登錄

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
         if (!closed) {
         ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
         ctx.close();
         closed = true;
         }
         }
         

    • 總結
      • 當鏈路斷連的時候,已經放入發送隊列中的消息不能丟失,更加通用的做法是提供通知機制,將發送失敗的消息通知給業務測,由業務做決定:是丟棄還是緩存重發
    • 關于編解碼的一些測試
      • sendBuf.setInt(4, sendBuf.readableBytes() - 8)
        • 這個是編碼最后將長度值寫入了,這個為啥-8呢
      • NettyMessageDecoder(1024 * 1024, 4, 4))
        • 可以看到解碼傳入的LengthFieldBasedFrameDecoder參數分別是4,4,兩個4分別代碼lengthFieldOffset和lengthFieldLength,即長度字段的偏移和長度字段的表示長度
      • 從下面的源碼注釋來看,這個長度其實是指長度字段后的所有字節的長度,從解碼角度來說也是如此,從源碼解讀亦如此,請注意這個問題
      • 所以建議好好看源代碼,源代碼才是王道

    lengthFieldOffset = 2
         lengthFieldLength = 3
         lengthAdjustment = 0
         initialBytesToStrip = 0
         BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
         +--------+--------+----------------+ +----------+----------+--------------
         |Header 1| Length |Actual Content |----->| Header 1 | Length | Actual Content |
         |0xCAFE | 0x00000C| "HELLO, WORLD"| | 0xCAFE | 0x00000C | "HELLO, WORLD"
         

    服務端創建

    • Netty服務端創建關鍵步驟
      • 創建ServerBootstrap實例。ServerBootstrap是 Netty服務端的啟動輔助類,它提供了一系列的方法用于設置服務端啟動相關的參數-門面模式對各種能力進行抽象和封裝
        • 引入Builder模式,因為參數太多
      • 設置并綁定Reactor線程池。Netty的 Reactor線程池是EventLoopGroup,它實際就是 EventLoop的數組。EventLoop的職責是處理所有注冊到本線程多路復用器Selector上的Channel,Selector的輪詢操作由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行.
        • EventLoop的職責不僅僅是處理網絡I/O 事件,用戶自定義的Task和定時任務Task也 統 由 EventLoop負責處理,這樣線程模型就實現了統一
        • 從調度層面看,也不存在從EventLoop線程中再啟動其他類型的線程用于異步執行另外的任務,這樣就避免了多線程并發操作和鎖競爭,提升了I/O 線程的處理和調度性能
      • 設置并綁定服務端Channel。作為NIO服務端,需要創建ServerSocketChannel,Netty對原生的NIO類庫進行了封裝,對應實現是NioServerSocketChannel
        • Netty通過工廠類,利用反射創建NioServerSocketChannel對象。由于服務端監聽端口往往只需要在系統啟動時才會調用,因此反射對性能的影響并不大
        • ServerBootstrapChannelFactory
      • 鏈路建立的時候創建并初始化ChannelPipeline。ChannelPipeline并不是NIO服務端必需的,它本質就是一個負責處理網絡事件的職責鏈,負責管理和執行ChannelHandler。網絡事件以事件流的形式在ChannelPipeline中流轉。典型的網絡事件如下:
        • 鏈路注冊、鏈路激活、鏈路斷開、接收到請求消息、請求消息接收并處理完畢、發送應答消息、鏈路發生異常、發生用戶自定義事件
      • 初始化 ChannelPipeline 完成之后,添加并設置 ChannelHandler是 Netty提供給用戶定制和擴展的關鍵接口。利用ChannelHandler用戶可以完成大多數的功能定制,例如消息編解碼、心跳、安全認證、TSL/SSL認證、流量控制和流景整形等。
        • Netty同時也提供了大量的系統ChannelHandler供用戶使用 ,比較實用的系統ChannelHandler如:
          • ByteToMessageCodec、LengthFieldBasedFrameDecoder、 LoggingHandler、SslHandler、 IdleStateHandler、 ChannelTrafficShapingHandler、Base64Decoder 和 Base64Encoder
      • 綁定并啟動監聽端口。在綁定監聽端口之前系統會做一系列的初始化和檢測工作,完成之后,會啟動監聽端口,并將ServerSocketChannel注冊到Selector上監聽客戶端連接
      • Selector輪詢。由Reactor線程NioEventLoop負責調度和執行Selector輪詢操作,選擇準備就緒的Channel集合
        • 參考源代碼NioEventLoopGroup、NioEventLoop的源代碼實現
          • 會傳入SelectorProvider.provider用來打開Selector
          • 每一個NioEventLoop持有一個Selector
      • 當輪詢到準備就緒的Channel之后,就 由 Reactor線 程 NioEventLoop執ChannelPipeline的相應方法,最終調度并執行ChannelHandler
        • 源碼請參考ChannelPipeline
        • fireXXX->DefaultChannelPipeline->AbstractChannelHandlerContext.invokeXXX(head)
      • 執行Netty系統ChannelHandler和用戶添加定制的ChannelHandler

     I/O Request
         * via {@link Channel} or
         * {@link ChannelHandlerContext}
         * |
         * +---------------------------------------------------+---------------+
         * | ChannelPipeline | |
         * | \|/ |
         * | +---------------------+ +-----------+----------+ |
         * | | Inbound Handler N | | Outbound Handler 1 | |
         * | +----------+----------+ +-----------+----------+ |
         * | /|\ | |
         * | | \|/ |
         * | +----------+----------+ +-----------+----------+ |
         * | | Inbound Handler N-1 | | Outbound Handler 2 | |
         * | +----------+----------+ +-----------+----------+ |
         * | /|\ . |
         * | . . |
         * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
         * | [ method call] [method call] |
         * | . . |
         * | . \|/ |
         * | +----------+----------+ +-----------+----------+ |
         * | | Inbound Handler 2 | | Outbound Handler M-1 | |
         * | +----------+----------+ +-----------+----------+ |
         * | /|\ | |
         * | | \|/ |
         * | +----------+----------+ +-----------+----------+ |
         * | | Inbound Handler 1 | | Outbound Handler M | |
         * | +----------+----------+ +-----------+----------+ |
         * | /|\ | |
         * +---------------+-----------------------------------+---------------+
         * | \|/
         * +---------------+-----------------------------------+---------------+
         * | | | |
         * | [ Socket.read() ] [ Socket.write() ] |
         * | |
         * | Netty Internal I/O Threads (Transport Implementation) |
         * +-------------------------------------------------------------------+
         

    • Netty服務端創建源碼分析
      • EventLoopGroup acceptorGroup = new NioEventLoopGroup()、EventLoopGroup IOGroup = new NioEventLoopGroup()
        • acceptor線程池
        • io-processor線程池
        • 并不是必須要創建兩個不同的EventLoopGroup, 也可以只創建一個并共享
      • TCP的backlog參數
        • backlog指定了內核為此套接口排隊的最大連接個數,對于給定的監聽套接口,內核要維護兩個隊列:未鏈接隊列和已連接隊列
          • 和tcp建立鏈接的三次握手相關
          • backlog被規定為兩個隊列總和的最大值
          • Netty默認的backlog為100,Lighttpd中此值達到128x8,可根據實際場景和網絡狀況進行靈活設置
      • TCP參數設置完成后,用戶可以為啟動輔助類和其父類分別指定Handler。兩類Handler的用途不同
        • AbstractBootstrap#handler,指定父類的handler
        • ServerBootstrap#childHandler,指定子類的handler
        • 通過看源代碼,發現此書書中的描述有點不準確(或者說不直觀),其實父類中的handler是添加到ServerSocketChannel的pipeline的,這個handler在server啟動后就行執行,如LoggingHandler
        • 子類的handler是添加導SocketChannel的pipeline的
      • 最后一步,就是綁定本地端口,啟動服務
        • AbstractBootstrap#doBind、initAndRegister
        • ServerBootstrap#init(Channel channel)
          • 設置Socket參數和NioServerSocketChannel的附加屬性
          • 將AbstractBootstrap的Handler添加到 NioServerSocketChannel的 ChannelPipeline中
          • 將用于服務端注冊的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中
            • ServerBootstrapAcceptor#channelRead
              • child.pipeline().addLast(childHandler),將子handler加到了SocketChannel的pipeline中
      • 當 NioServerSocketChannel 初始化完成之后,需要將它注冊到Reactor線程的多路復用器上監聽新客戶端的接入
        • AbstractChannel$AbstractUnsafe#register#register0
          • AbstractNioChannel#doRegister->注冊到NioEventLoop的Selector上
          • pipeline.fireChannelRegistered,觸發注冊事件,傳遞給pipeline,執行父類的 handler
          • 另外判斷是否是NioEventLoop自身發起的操作。如果是,則不存在并發操作,直接執行Channel注冊:如果由其他線程發起,則封裝成一個Task放入消息隊列中異步執行。此處,由于是由ServerBootstrap所在線程執行的注冊操作,所以會將其封裝成Task投遞到NioEventLoop中執行
        • ServerBootstrap#createChannel
          • EventLoop eventLoop = group().next()
            • 這里順序選取了一個線程,注意這里的group是指parentGroup
    • 客戶端接入源碼分析
      • 負責處理網絡讀寫、連接和客戶端請求接入的Reactor線程就是NioEventLoop,當多路復用器檢測到新的準備就緒的Channel時,默認執行processSelectedKeysOptimized
        • NioEventLoop#run
        • 由于Channel的Attachment是NioServerSocketChannel, 所以執processSelectedKey
          • 根據就緒的操作位,執行不同的操作
          • unsafe.read()
            • NioMessageUnsafe#read
              • NioServerSocketChannel#doReadMessages
                • 接收新的客戶端連接(調用accept)并創建NioSocketChannels
                • 注意初始化new NioSocketChannel(this, childEventLoopGroup().next(), ch),即要傳入childGroup中的一個線程
              • 接收到新的客戶端連接后,觸發ChannelPipeline ChannelRead方法
              • 執行headChannelHandlerContext的fireChannelRead 方法,事件在 ChannelPipeline中傳遞,執行ServerBootstrapAcceptor的channelRead方法
                • 將啟動時傳入的childHandler加入到客戶端SocketChannel的 ChannelPipeline中
                • 設置客戶端SocketChannel的TCP參數
                • 注冊SocketChannel到多路復用器
    • 總結
      • 源代碼分析是基于netty-all-5.0.0.Alpha1版本,其實有些代碼和4.x還是有一些區別的
      • 對于NioServerSocketChannel和NioSocketChannel,在其構造中均會指定readInterestOp
        • super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT)
          • AbstractNioMessageServerChannel
        • super(parent, eventLoop, ch, SelectionKey.OP_READ)
          • AbstractNioChannel

    客戶端創建

    • Bootstrap是Socket客戶端創建工具類,用戶通過Bootstrap可以方便地創建的客戶端并發起異步 TCP連接操作
    • Netty客戶端創建流程分析
      • 用戶線程創建Bootstrap實例,通過API設置創建客戶端相關的參數,異步發起客戶端連接
      • 創建處理客戶端連接、I/O讀寫的Reactor線程組NioEventLoopGroup。可以通過構造函數指定I/O線程的個數,默認為CPU內核數的2倍
      • 通過Bootstrap的ChannelFactory和用戶指定的Channel類型創建用于客戶端連接的NioSocketChannel,它的功能類似于JDK NIO類庫提供的SocketChannel
      • 創建默認的Channel Handler Pipeline, 用于調度和執行網絡事件
        • 注意這里調用的是父類的handler方法
      • 異步發起TCP連接,判斷連接是否成功。如果成功,則直接將NioSocketChannel注冊到多路復用器上,監聽讀操作位,用于數據報讀取和消息發送:如果沒有立即連接成功,則注冊連接監聽位到多路復用器,等待連接結果
      • 注冊對應的網絡監聽狀態位到多路復用器
      • 由多路復用器在1/0現場中輪詢各Channel, 處理連接結果
      • 如果連接成功,設置Future結果,發送連接成功事件,觸發ChannelPipeline執行
      • 由ChannelPipeline調度執行系統和用戶的ChannelHandler, 執行業務邏輯
    • Netty客戶端創建源碼分析
      • Bootstrap是 Netty提供的客戶端連接工具類,主要用于簡化客戶端的創建
      • 客戶端相對于服務端,只需要一個處理I/O讀寫的線程組即可
      • Bootstrap也提供了客戶端TCP參數設置接口
        • SO_TIMEOUT 控制讀取操作將阻塞多少毫秒
        • SO_SNDBUF 套接字使用的發送緩沖區大小
        • SO_RCVBUF 套接字使用的接收緩沖區大小
        • SO_REUSEADDR
          • 用于決定如果網絡上仍然有數據向舊的ServerSocket傳輸數據 ,是否允許新的 ServerSocket綁定到與舊的ServerSocket同樣的端口上
        • CONNECTTIMEOUTMILLIS
          • 客戶端連接超時時間,由于NIO原生的客戶端并不提供設置連接超時的接口,因此 , Netty采用的是自定義連接超時定時器負責檢測和超時控制
        • TCPN0DELAY
          • 激活或禁止TCP_NODELAY套接字選項,它決定是否使用Nagle算法。如果是時延敏感型的應用,建議關閉Nagle算法
      • 對于TCP客戶端連接,默認使用NioSocketChannel
        • BootstrapChannelFactory利用channelClass類型信息 , 通過反射機制創建NioSocketChannel對象
      • Bootstrap為了簡化Handle 的編排,提供 Channellnitializer,它繼承了 ChannelHandlerAdapter, 當 TCP鏈路注冊成功之后,調用initChannel接口,用于設置用戶ChannelHandler
        • ChannelInitializer#channelRegistered
          • initChannel
      • ChannelFuture f = b.connect(host, port).sync(),發起客戶端連接
    • 客戶端連接操作
      • Bootstrap#connect
        • doConnect
          • 首先要創建和初始化NioSocketChannel
            • AbstractBootstrap#initAndRegister
              • createChannel-channelFactory().newChannel(group().next())
                • 這里的group為父類的parentGroup
              • init(channel)
              • 初始化Channel之后,將其注冊到Selector上
                • channel.unsafe().register(regFuture)
          • doConnect0
            • 從該操作開始,連接操作切換到了Netty的NIO線程NioEventLoop中進行,此時客戶端返回,連接操作異步執行
          • doConnectO最終調用HeadHandler的connect方法
            • AbstractNioUnsafe#connect
              • NioSocketChannel#doConnect
                • javaChannel().socket().bind(localAddress)
                • boolean connected = javaChannel().connect(remoteAddress)
                • if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT)}
              • 異步連接返回之后,需要判斷連接結果,如果連接成功,則觸發ChannelActive事件,否則注冊SelectionKey.OP_CONNECT到多路復用器
      • 異步連接結果通知
        • NioEventLoop的 Selector輪詢客戶端連接Channel,當服務端返回握手應答之后,對連接結果進行判斷
          • NioEventLoop#processSelectedKey
          • 監聽SelectionKey.OP_CONNECT
            • AbstractNioUnsafe#finishConnect
              • doFinishConnect
                • NioSocketChannel#doFinishConnect,判斷JDK的SocketChannel的連接結果
              • 連接成功之后,調用fulfillConnectPromise方法,觸發鏈路激活事件,該事件由ChannelPipeline進行傳播
    • 客戶端連接超時機制
      • 在創建Netty客戶端的時候,可以通過ChannelOption.CONNECTTIMEOUTMILLIS配置項設置連接超時時間
      • 發起連接的同時,啟動連接超時檢測定時器
        • AbstractNioUnsafe.connect
          • 一旦超時定時器執行,說明客戶端連接超時
          • 如果在連接超時之前獲取到連接結果,則刪除連接超時定時器,防止其被觸發
            • AbstractNioUnsafe#finishConnect中finally塊的處理

    ByteBuf和相關輔助類

    • NIO的ByteBuffer的主要缺點
      • 長度固定,一旦分配完成,它的容量不能動態擴展和收縮
      • 只有一個標識位置的指針position, 讀寫的時候需要手工調用flip和rewind等,使用者必須小心謹慎地處理這些API
      • 一些高級和實用的特性不支持
    • Netty的ByteBuf
      • 7種 ava基礎類型、byte數組、ByteBuffer (ByteBuf) 等的讀寫
      • 緩沖區自身的copy和 slice等
      • 設置網絡字節序
      • 構造緩沖區實例
      • 操作位置指針等方法
        • ByteBuf通過兩個位置指針來協助緩沖區的讀寫操作,讀操作使用readerlndex, 寫操作使用 writerlndex
        • 由于寫操作不修改readerlndex指針,讀操作不修改writerlndex指針,因此讀寫之間不再需要調整位置指針,這極大地簡化了緩沖區的讀寫操作
      • ByteBuf會自動進行動態擴展
    • ByteBuf功能介紹
      • 順序讀操作(read)
      • 順序寫操作(write)
      • readerlndex和writerlndex
        • 調用 ByteBuf的read 操作時,從readerlndex處開始讀取。readerlndex到 writerlndex之間的空間為可讀的字節緩沖區;從 writerlndex到 capacity之間為可寫的字節緩沖區;0到readerlndex 之間是已經讀取過的緩沖區,可以調用discardReadBytes操作來重用這部分空間,以節約內存,防止ByteBuf的動態擴張
      • Discardable bytes
        • 將已讀的字節部分丟棄
        • 調用discardReadBytes會發生字節數組的內存復制,所以,頻繁調用將會導致性能下降
      • Readable bytes 和 Writable bytes
        • 可讀空間段是數據實際存儲的區域,以read或者skip開頭的任何操作都將會從 readerlndex開始讀取或者跳過指定的數據,操作完成之后readerlndex增加了讀取或者跳 過的字節數長度
        • 可寫空間段是尚未被使用可以填充的空閑空間,任何以write開頭的操作都會從writerlndex開始向空閑空間寫入字節,操作完成之后writerlndex增加了寫入的字節數長度
      • Clear操作
      • Mark和Rest操作
      • 查找操作
      • Derived buffers
        • duplicate 返回當前ByteBuf的復制對象,共享緩沖區,讀寫索引獨立
        • copy復制一個新的ByteBuf對象,內容和索引都獨立
        • slice 可讀子緩沖區(起始位置從readerlndex到 writerlndex),共享內容,讀寫索引獨立維護
      • 轉換成標準的ByteBuffer
        • ByteBuf#nioBuffer、nioBuffer(index,length)
        • 返回后的ByteBuffer無法感知原ByteBuf的動態擴展操作
      • 隨機讀寫(set和 get)
    • 源碼概要分析
      • 從內存分配的角度看,ByteBuf可以分為兩類
        • 堆內存(HeapByteBuf) 字節
          • 可以被JVM 自動回收;缺點就是如果進行Socket的 I/O 讀寫,需要額外做一次內存復制,將堆內存對應的緩沖區復制到內核Channel中
        • 直接內存(DirectByteBuf) 字節
          • 區:非堆內存,它在堆外進行內存分配;但是將它寫入或者從Socket Channel中讀取時,由于少了一次內存復制,速度比堆內存快
        • ByteBuf的最佳實踐是在I/O通信線程的讀寫緩沖區使用DirectByteBuf, 后端業務消息的編解碼模塊使用HeapByteBuf, 這樣組合可以達到性能最優
        • 從內存回收角度看,ByteBuf也分為兩類:基于對象池的ByteBuf和普通ByteBuf。兩者的主要區別就是基于對象池的ByteBuf可以重用ByteBuf對象,它自己維護了一個內存池,可以循環利用創建的ByteBuf,提升內存的使用效率,降低由于高負載導致的頻繁GC
      • AbstractByteBuf
        • ByteBuffer的一個最大的缺點就是一旦完成分配之后不能動態調整其容量。由于很多場景下我們無法預先判斷需要編碼和解碼的POJO對象長度,因此只能根據經驗數據給個估計值。如果這個值偏大,就會導致內存的浪費;如果這個值偏小,遇到大消息編碼的時候就會發生緩沖區溢出異常。使用者需要自己捕獲這個異常,并重新計算緩沖區的大小,將原來的內容復制到新的緩沖區中,然后重置指針。這種處理策略對用戶非常不友好,而且稍有不慎,就會引入新的問題
        • 采用倍增或者步進算法,動態擴張需要進行內存復制,頻繁的內存復制會導致性能下降;采用先倍增后步進
      • AbstractReferenceCountedByteBuf
        • 引用計數
        • ReferenceCounted
          • A reference-counted object that requires explicit deallocation
          • retain
            • increases the reference count
          • release
            • decreases the reference count
      • UnpooledHeapByteBuf
        • UnpooledHeapByteBuf是基于堆內存進行內存分配的字節緩沖區,它沒有基于對象池 技術實現,這就意味著每次I/O 的讀寫都會創建個新的UnpooledHeapByteBuf
      • PooledByteBuf
        • PoolArena,Netty的內存池實現類
        • 為了集中管理內存的分配和釋放,同時提高分配和釋放內;時候的性能,很多框架和應用都會通過預先申請一大塊內存,然后通過提供相應的分配和釋放接口來使用內存。這樣一來,對內存的管理就被集中到幾個類或者函數中,由于不再頻繁使用系統調用來申請和釋放內存,應用或者系統的性能也會大大提髙。在這種設計思路下,預先申請的那一大塊內存就被稱為Memory Arena
        • Netty的 PoolArena是由多個Chunk組成的大塊內存區域,而每個Chunk則由一個或者多個Page組成
        • 由于采用內存池實現,所以新創建PooledDirectByteBuf對象時不能直接new —個實例,而是從內存池中獲取,然后設置引用計數器的值
      • ByteBufHolder
        • 相當于協議的消息體容器
      • ByteBufAllocator
        • responsible to allocate buffers
      • CompositeByteBuf
        • A virtual buffer which shows multiple buffers as a single merged buffer
        • CompositeByteBuf允許將多個ByteBuf的實例組裝到一起,形成一個統一的視圖
        • 如某個協議POJO對象包含兩部分:消息頭和消息體,它們都是ByteBuf對象。當需要對消息進行編碼的時候需要進行整合
      • ByteBufUtil
        • A collection of utility methods that is related with handling ByteBuf
        • encodeString、decodeString、hexDump

    Channel 和 Unsafe

    • 類似于NIO的 Channel, Netty提供了自己的Channel和其子類實現,用于異步I/O操作和其他相關的操作
    • Unsafe是個內部接口,聚合在Channel中協助進行網絡讀寫相關的操作,因為它的設計初衷就是Channel的內部輔助類,不應該被Netty框架的上層使用者調用,所以被命名為Unsafe
    • io.netty.channd.Channel是Netty網絡操作抽象類,它聚合了一組功能,包括但不限于網路的讀、寫,客戶端發起連接,主動關閉連接,鏈路關閉,獲取通信雙方的網絡地址等。它也包含了Netty框架相關的一些功能,包括獲取該Channel的EventLoop , 獲取緩沖分配器 ByteBufAllocator和pipeline等
    • 為什么不使用JDK NIO 原生的Channel
      • JDK的SocketChannel和ServerSocketChannel沒有統一的Channel接口
      • JDK的SocketChannel和ServerSocketChannel的主要職責就是網絡 I/O 操作,由于它們是SPI(service provider interface)類接口,由具體的虛擬機廠家來提供;直接實現SocketChannel 和 ServerSocketChannel抽象類,其工作量和重新開發一個新的 Channel 功能類是差不多的
      • Netty的Channel需要能夠跟 Netty的整體架構融合在一起,例如 I/O 模型、基于 ChannelPipeline 的定制模型,以及基于元數據描述配置化的 TCP參數等,這些 JDK的SocketChannel和 ServerSocketChannel都沒有提供,需要重新封裝
      • 自定義的Channel, 功能實現更加靈活
    • Channel的功能介紹
      • 網絡I/O操作
        • read、write、flush、close、disconnect、close、connect、bind等
        • ctx.close() starts to flow through the ChannelPipeline from the point of the ChannelHandlerContext while ctx.channel().close() will start from the tail of the ChannelPipeline all the time
        • 從NioSocketChannel的doDisconnect實現來看,其直接調用了doClose,所以從TCP一層面上可以理解,disconnect和close一樣
      • 其他
        • 通過eventLoop()方法可以獲取到Channel注冊的EventLoop
        • 通過metadata()方法就可以獲取當前Channel的TCP參數配置
        • parent()。對于服務端Channel而言,它的父Channel為空:對于客戶端Channel, 它的 Channel就是創建它的ServerSocketChannel
        • 用戶獲取Channel標識的id
          • ChannelId、DefaultChannelId
    • Channel源碼分析
      • AbstractChannel
        • 聚合了所有Channel使用到的能力對象,由AbstractChannel提供初始化和統一封裝
        • 網絡讀寫操作會觸發CharmelPipeline對應的事件方法。Netty基于事件驅動,我們也可以理解為當Chnanel進行I/O 操作時產生生對應的I/O 事件,然后驅動事件在ChannelPipeline中傳播,由對應的ChannelHandler對事件進行攔截和處理
        • 網絡 I/O 操作直接調用 DefaultChannelPipeline 的相關方法,由DefaultChannelPipeline中對應的ChannelHandler進行具體的邏輯處理
        • 提供了一些公共API具體實現 例如localAddress()和remoteAddress()
      • AbstractNioChannel
        • Abstract base class for {@link Channel} implementations which use a Selector based approach.
        • doRegister、doBeginRead
      • AbstractNioByteChannel
        • {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes
        • doWrite,注意處理半包
          • 注意環形數組-ChannelOutboundBuffer#Entry[] buffer
      • AbstractNioMessageChannel
        • {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages
        • 一個發送的是ByteBuf或者FileRegion, 它們可以直接被發送:另一個發送的則是POJO對象
      • AbstractNioMessageServerChannel
        • 定義了一個 EventLoopGroup 類型的childGroup, 用于給新接入的客戶端NioSocketChannel分配EventLoop
      • NioServerSocketChannel
        • 很多方法實現通過調用javaChannel
        • doReadMessages,javaChannel().accept()
          • new NioSocketChannel(this, childEventLoopGroup().next(), ch)
      • NioSocketChannel
        • doConnect
          • javaChannel().socket().bind
          • javaChannel().connect
        • doWrite
          • ChannelConfig#getWriteSpinCount:Returns the maximum loop count for a write operation
        • doReadBytes
          • Read bytes into the given ByteBuf and return the amount
    • Unsafe
      • Unsafe接口實際上是Channel接口的輔助接口
      • 實際的I/O讀寫操作都是由Unsafe接口負責完成的
      • AbstractChannel$AbstractUnsafe
        • register
        • bind
        • disconnect
          • doDisconnect() might have closed the channel
        • close
          • 從disconnect和close的實現來看,close做的工作更多,如首先判斷是否處于刷新狀態,如果處于刷新狀態說明還有消息尚未發送出去,需要等到所有消息發送完成再關閉鏈路等
        • write
          • 實際上將消息添加到環形發送數組中
        • flush#flush0#doWrite
        • AbstractNioUnsafe
        • NioByteUnsafe
          • read
          • AdaptiveRecvByteBufAllocator,緩沖區大小可以動態調整的ByteBuf分配器
            • Netty根據上次實際讀取的碼流大小對下次的接收Buffer緩沖區進行預測和調整,能夠最大限度地滿足不同行業的應用場景
            • 其根據本次讀取的實際字節數對下次接收緩沖區的容量進行動態調整
          • doReadBytes
          • 完成一次異步讀之后,就會觸發一次ChannelRead事件
            • 在沒有做仟何半包處理的情況下,以 ChannelRead的觸發次數做計數器來進行性能分析和統計,是完全錯誤的
          • 連續讀操作做上限控制,默認值為16次,無論TCP緩沖區有多少碼流需要讀取,只要連續16次沒有讀完,都需要強制退出,等待下次selector輪詢周期再執行
          • 完成多路復用器本輪讀操作之后,觸發ChannelReadComplete事件

    ChannelPipeline和ChannelHandler

    • Netty的 Channel過濾器實現原理與ServletFilter機制一致,它將Channel的數據管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動和傳遞。ChannelPipeline 持有 I/O 事件攔截器ChannelHandler的鏈表,由ChannelHandler對 I/O 事件進行攔截和處理,可以 方便地通過新增和刪除ChannelHandler來實現不同的業務邏輯定制,不需要對已有的ChannelHandler進行修改,能夠實現對修改封閉和對擴展的支持
    • ChannelPipeline 的事件處理
      • 底層的SocketChannel read方法讀取 ByteBuf,觸發 ChannelRead 事件,由 I/O線程 NioEventLoop 調用 ChannelPipeline 的 fireChannelRead(Object msg)方法,將消息 (ByteBuf) 傳輸到 ChannelPipeline中
      • 消息依次被 HeadHandler、ChannelHandlerl、ChannelHandler2 ... TailHandler 攔 截和處理,在這個過程中,任何ChannelHandler都可以中斷當前的流程,結束消息的傳遞
      • 調用ChannelHandlerContext的 write方法發送消息,消息從TailHandler開始,途經 ChannelHandlerN ... ChannelHandlerl、HeadHandler, 最終被添加到消息發送緩沖區中等待刷新和發送,在此過程中也可以中斷消息的傳遞
      • Netty中的事件分為inbound事件和outbound事件。inbound事件通常由I/O 線程觸發
        • ChannelHandlerContext#fireChannelRegistered
        • ChannelHandlerContext#fireChannelActive
        • ChannelHandlerContext#fireChannelRead
        • ChannelHandlerContext#fireChannelReadComplete
        • ChannelHandlerContext#fireExceptionCaught
        • ChannelHandlerContext#fireUserEventTriggered
        • ChannelHandlerContext#fireChannelWritabilityChanged
        • ChannelHandlerContext#fireChannelInactive
          • 舉例如DefaultChannelHandlerContext#fireChannelRead
            • findContextInbound(MASKCHANNELREAD)
              • Inbound
      • Outbound事件通常是由用戶主動發起的網絡I/O操作
        • bind、connect、write、flush、read、disconnect、close
          • 舉例如DefaultChannelHandlerContext#write
            • findContextOutbound(MASK_WRITE)
              • Outbound
    • 自定義攔截器
      • 通常ChannelHandler只需要繼承 ChannelHandlerAdapter類覆蓋自己關心的方法即可
    • 構建pipeline
      • 使用ServerBootstrap或者Bootstrap啟動服務端或者客戶端時,Netty會為每個Channel連接創建個獨立的pipeline
      • 對于使用者而言,只需要將自定義的攔截器加入到pipeline中即可
      • 對于類似編解碼這樣的ChannelHandler,它存在先后順序
      • Pipeline支持指定位置添加或者刪除攔截器
    • ChannelPipeline源碼分析
      • ChannelPipeline支持運行期動態修改,線程安全
      • 其內部維護里一個鏈表(DefaultChannelHandlerContext.next/prev)和name2ctx的Map
    • ChannelPipeline 的 inbound 事件
      • pipeline中以fireXXX命名的方法都是從I/O線程流向用戶業務Handler的inbound事件
        • head.fireXXX 調用HeadHandler對應的fireXXX方法
          • DefaultChannelHandlerContext#fireXXX
        • 執行相關邏輯
    • ChannelPipeline的outbound 事件
      • 由用戶線程或者代碼發起的I/O操作被稱為outbound事件
      • Pipeline本身并不直接進行 I/O 操作,在前面對Channel和Unsafe的介紹中我們知道 最終都是由Unsafe和Channel 來實現真正的 I/O操作的。 Pipeline負責將 I/O 事件通TailHandler 行調度和傳播,最終調用Unsafe的I/O方法進行I/O操作
      • 整體由NioEventLoop進行驅動
    • ChannelHandler
      • 基于ChannelHandler接口,用戶可以方便地進行業務邏輯定制
      • ChannelHandler 支持注解,Shareble和Skip(被Skip注解的方法不會被調用)
    • ChannelHandlerAdapter
      • 所有方法都被加了@Skip注解,這些方法在執行的過程中會被忽略,直接跳到下一個ChannelHandler中執行對應的方法
    • ByteToMessageDecoder、MessageToMessageDecoder
    • LengthFieldBasedFrameDecoder
      • lengthFieldOffset、lengthFieldLength、lengthAdjustment、initialBytesToStrip
        • the offset of the length field
        • the length of the length field
        • the compensation value to add to the value of the length field
        • the number of first bytes to strip out from the decoded frame
          • lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
          • frameLength = getUnadjustedFrameLength 根據offset和fieldLen獲取幀長度
          • frameLength += lengthAdjustment + lengthFieldEndOffset
          • int actualFrameLength = frameLengthInt - initialBytesToStrip
            • 可以看到這個lengthAdjustment是個加法
            • in some protocols, the length field represents the length of the whole message, including the message header
      • 詳見LengthFieldBasedFrameDecoder的javadoc注釋和源代碼
    • MessageToByteEncoder
    • MessageToMessageEncoder
    • LengthFieldPrepender
      • An encoder that prepends the length of the message
      • lengthIncludesLengthFieldLength,消息長度將包含長度本身占用的字節數
        • length = msg.readableBytes() + lengthAdjustment
        • if (lengthIncludesLengthFieldLength) length += lengthFieldLength

    EventLoop和EventLoopGroup

    • Reactor單線程模型,是指所有的I/O操作都在同一個NIO線程上面完成
      • 由于Reactor模式使用的是異步非阻塞I/O, 所有的I/O操作都不會導致阻塞,理論上一個線程可以獨立處理所有I/O相關的操作
    • Reactor多線程模型與單線程模型最大的區別就是有一組NIO線程來處理I/O操作
      • 有專門一個NIO線程 — Acceptor線程用于監聽服務端
      • 網絡I/O操作讀、寫等由一個NIO線程池負責
      • 1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應一個NIO線程
        • 1個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題
    • 主從Reactor多線程模型
      • 服務端用于接收客戶端連接的不再是一個單獨的NIO線程,而是一個獨立的NIO線程池
    • 最佳實踐
      • 創建兩個NioEventLoopGroup,用于邏輯隔離NIOAcceptor和 NIO線程
      • 盡量不要在ChannelHandler中啟動用戶線程(解碼后用于將POJO消息派發到后端業務線程的除外)
      • 解碼要放在NIO線程調用的解碼Handler中進行
      • 如果業務邏輯操作非常簡單,沒有復雜的業務邏輯計算,沒有可能會導致線程被阻塞的磁盤操作、數據庫操作、網路操作等,可以直接在NIO線程上完成業務邏輯編排,不需要切換到用戶線程
      • 如果業務邏輯處理復雜,不要在NIO線程上完成,建議將解碼后的POJO消息封裝成Task, 派發到業務線程池中由業務線程執行,以保證NIO線程盡快被釋放,處理其他的I/O操作
    • NioEventLoop 設計原理
      • 并不是一個純粹的I/O線程,它除了負責I/O的讀寫之外
      • 通過調用 NioEventLoop 的 execute(Runnable task)方法實現,Netty 有很多系統Task, 創建它們的主要原因是:當 I/O 線程和用戶線程同時操作網絡資源時,為了防止并發操作導致的鎖競爭,將用戶線程的操作封裝成Task放入消息隊列中,由I/O 線程負責執行,這樣就實現了局部無鎖化
      • 通過調用NioEventLoop的schedule處理定義任務
      • NioEventLoop中的run方法是SingleThreadEventExecutor中定義的抽象方法
    • NioEventLoop
      • 作為NIO框架的Reactor線程,NioEventLoop需要處理網絡I/O讀寫事件,因此它必須聚合一個多路復用器對象
      • rebuildSelector
        • 解決the infamous epoll 100% CPU bug
        • 在某個周期(例 如 100ms) 內如果連續發生JV次空輪詢,說明觸發了 JDK NIO的epoll()死循環bug
      • 處理完I/O事件之后,NioEventLoop需要執行非I/O操作的系統Task和定時任務
        • 為了保證兩者都能得到足夠的CPU時間被執行,Netty提供了I/O比例供用戶定制
        • setIoRatio
          • The default value is{@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks

    Future 和 Promise

    • Netty強烈建議直接通過添加監聽器的方式獲取I/O操作結果
      • 當 I/O操作完成之后,I/O線程會回調ChannelFuture中 GenericFutureListener的operationComplete方法,并把ChannelFuture對象當作方法的入參
    • 異步I/O 操作有兩類超時:一個是TCP層面的I/O 超時,另一個是業務邏輯層面的操作超時
    • Promise是可寫的Future,Future自身并沒有寫操作相關的接口,Netty通 過Promise對Future進行擴展,用于設置I/O操作的結果
      • 循環判斷的原因是防止線程被意外喚醒導致的功能異常(虛假喚醒)
      • 由于在I/O線程中調用Promise的await或者sync方法會導致死鎖
        • checkDeadLock

    Netty架構剖析

    • Netty采用了典型的三層網絡架構進行設計和開發
      • Reactor通信調度層
      • 職責鏈ChannelPipeline
      • 業務邏輯編排層(Service ChannelHandler)
    • 關鍵架構質量屬性
      • 高性能
        • 采用異步非阻塞的I/O類庫,基于Reactor模式
        • TCP接收和發送緩沖區使用直接內存代替堆內存
        • 支持通過內存池的方式循環利用ByteBuf
        • 可配置的I/O 線程數、TCP參數
        • 采用環形數組緩沖區實現無鎖化并發編程
        • 合理地使用線程安全容器、原子類等
        • 關鍵資源的處理使用單線程串行化
        • 通過引用計數器及時地申請釋放不再被引用的對象
      • 可靠性
        • 鏈路有效性檢測(讀空閑超時機制、寫空閑超時機制)
        • 內存保護機制
        • 優雅停機
          • 優雅停機往往需要設置個最大超時時間T,如果達到T后系統仍然沒有退出,則通過 Kill - 9 pid強殺當前的進程
      • 可定制性
        • 責任鏈模式、基于接口的開發、提供了大量工廠類、提供了大量的系統參數供用戶按需設置
      • 可擴展性
        • 可以方便地進行應用層協議定制

    Java 多線程編程在 Netty中的應用

    • Java內存模型
      • 工作內存和主內存
      • Java內存交互協議
      • 對于SUN的 JDK,在 Windows和 Linux操作系統上采用了內核線程的實現方式
        • 這種線程由內核來完成線程切換,內核通過線程調度器對線程進行調度,并負責將線程任務映射到不同的處理器上
    • Netty的并發編程實踐
      • 對共享的可變數據進行正確的同步-synchronized
        • 鎖的范圍需要盡可能的小
      • 正確使用鎖
        • 始終使用wait循環來調用wait方法,永遠不要在循環之外調用wait方法。這樣做的原因是盡管并不滿足被喚醒條件,但是由于其他線程調用notifyAlIO方法會導致被阻塞線程意外喚醒,此時執行條件并不滿足
      • volatile的正確使用
        • 線程可見性
        • 禁止指令重排序優化
        • volatile最適合使用的是個線程寫,其他線程讀的場合
      • CAS指令和原子類
        • 悲觀鎖
        • 樂觀鎖。簡單地說,就是先進行操作,操作完成之后再判斷操作是否成功,是否有并發問題,如果有則進行失敗補償,如果沒有就算操作成功--CAS自旋
          • sun.misc.Unsafe
          • AtomicIntegerFieldUpdater
      • 線程安全類的應用
        • ConcurrentLinkedQueue
        • JDK的線程安全容器底層采用了 CAS、volatile和 ReadWriteLock實現,相比于傳統 重量級的同步鎖,采用了更輕量、細粒度的鎖,因此,性能會更高
        • Netty對 JDK的線程池進行了封裝和改造,但是,本質上仍然是利用了線程池和線程安全隊列簡化了多線程編程
      • 讀寫鎖的應用
        • HashedWheelTimer
      • 線程安全性文檔說明
        • 在 Netty中,對于一些關鍵的類庫,給出了線程安全性的API DOC
      • 不要依賴線程優先級
        • Netty中默認的線程工廠實現類,開放了包含設置線程優先級字段的構造函數。這是 個錯誤的決定.實際上JDK的線程優先級是無法跨平臺正確運行的

    高性能之道

    • I/O 通信性能三原則
      • 傳輸、協議、線程
    • Netty
      • 異步非阻塞通信
      • 高效的Reactor線程模型
        • 個人認為采用第三種所謂主從模型的話,則需要綁定多個端口,每一個端口與一個boss thread綁定
          • 實際bind的時候才會創建NioServerSocketChannel
      • 無鎖化的串行設計
      • 高效的并發編程
      • 高性能的序列化框架
      • 零拷貝
        • Netty的接收和發送ByteBuffer采用DIRECT BUFFERS
        • 第二種“零拷貝”的實現CompositeByteBuf, 它對外將多個ByteBuf封裝成1個ByteBuf
        • 很多操作系統直接將文件緩沖區的內容發送到目標Channel中,而不需要通過循環拷貝的方式
      • 內存池
        • PooledByteBufAllocator#DEFAULT#directBuffer
          • 使用內存池分配器創建直接內存緩沖區
          • 過 RECYCLER的 get方法循環使用ByteBuf對象,如果是非內存池實現,則直接創建 一個新的ByteBuf對象
            • setRefCnt方法設置引用計數器
        • Unpooled#directBuffer
          • 使用非堆內存分配器創建的直接內存緩沖區
      • 靈活的TCP參數配置能力

    可靠性

    • 網絡通信類故障
      • 客戶端連接超時
        • ChannelOption.CONNECTTIMEOUTMILLIS
        • 設置完連接超時之后,Netty在發起連接的時候,會根據超時時間創建ScheduledFuture 掛載在Reactor線程上,用于定時監測是否發生連接超時
          • AbstractNioUnsafe#connect
          • 如果在超時期限內處理完成連接操作,則取消連接超時定時任務
      • 通信對端強制關閉連接
        • 強制關閉客戶端,服務端己經監控到客戶端強制關閉了連接,釋放了連接句柄
          • I/O異常被統一處理,該異常向上拋,由NioByteUnsafe進行統一異常處理
        • 鏈路關閉
          • 己方或者對方主動關閉鏈接并不屬于異常場景,因此不會產生Exception事件通知 Pipeline
      • 定制I/O故障
        • 客戶端的斷連重連機制
        • 消息的緩存重發
        • 接口日志中詳細記錄故障細節
        • 運維相關功能,例如告警、觸發郵件/短信等
          • Netty的處理策略是發生I/O 異常,底層的資源由它負責釋放,同時將異常堆找信息 以事件的形式通知給上層用戶,由用戶對異常進行定制
      • 鏈路的有效性檢測
        • 心跳檢測的目的就是確認當前鏈路可用,對方活著并且能夠正常接收和發送消息
        • 心跳檢測機制
          • Ping-Pong型心跳:由通信一方定時發送Ping消息,對方接收到Ping消息之后,立即返回Pong應答消息給對方,屬于請求-響應型心跳
          • Ping-Ping型心跳:不區分心跳請求和應答,由通信雙方按照約定定時向對方發送心跳Ping消息,它屬于雙向心跳
            • 連續N次心跳檢測都沒有收到對方的Pong應答消息或者Ping請求消息,則認為 鏈路己經發生邏輯失效,這被稱作心跳超時
            • 讀取和發送心跳消息的時候如何直接發生了IO異常
          • Netty的心跳檢測實際上是利用了鏈路空閑檢測機制實現的
            • 讀空閑
            • 寫空閑
            • 讀寫空閑
              • 鏈路空閑的時候并沒有關閉鏈路,而是觸發IdleStateEvem事 件 ,用戶訂閱IdleStateEvent事件,用于自定義邏輯處理
        • Reactor線程的保護
          • 循環體內一定要捕獲Throwable
          • 規避NIO BUG
        • 內存保護
          • 鏈路總數的控制:每條鏈路都包含接收和發送緩沖區,鏈路個數太多容易導致內 存溢出
          • 單個緩沖區的上限控制
          • 緩沖區內存釋放
          • NIO消息發送隊列的長度上限控制
            • 緩沖區的內存泄漏保護
              • 為了提升內存的利用率,Netty提供了內存池和對象池.為了防止因為用戶遺漏導致內存泄漏,Netty在 Pipeline的尾Handler中自動對內存進行釋放
            • 實際的商用環境中,如果遇到畸形碼流攻擊、協議消息編碼異常、消息丟包等問題時,可能會解析到一個超長的長度字段
            • 流量整形
              • Netty流量整形的原理是:對每次讀取到的ByteBuf可寫字節數進行計算,獲取當前的報文流量,然后與流量整形閾值對比。如果已經達到或者超過了閾值。則計算等待時間delay, 將當前的ByteBuf放到定時任務Task中緩存,由定時任務線程池在延遲delay之后繼續處理該ByteBuf
              • 用戶可以通過參數設置:報文的接收速率、報文的發送速率、整形周期
              • Netty也支持鏈路級的流量整形
        • 優雅停機接口
    • 優化建議

      • 發送隊列容量上限控制
        • 如果網絡對方處理速度比較慢,導致TCP滑窗長時間為0 ; 或者消息發送方發送速度 過快,或者一次批量發送消息量過大,都可能會導致ChannelOutboundBuffer的內存膨脹, 這可能會導致系統的內存溢出
        • 過啟動項的ChannelOption設置發送隊列的長度或者通過-D 啟動參數配置該長度
      • 回推發送失敗的消息
        • Mina的實現,當發生鏈路異常之后,Mina會將尚未發送的整包消息隊列封裝到異常對象中,然后推送給用戶Handler, 由用戶來決定后續的處理策略
    • 安全性

      • Netty通過Ssmandler提供了對SSL的支持,它支持的SSL協議類型包括:SSLV2、SSLV3 和 TLS
        • SSL單向認證
        • SSL雙向認證
        • 第三方CA認證
      • Netty擴展的安全特性
        • IP地址黑名單機制
        • 接入認證

    展望

    • 屬性配置
      • 獲取見:SystemPropertyUtil
      • 如:io.netty.noUnsafe、io.netty.eventLoopThreads(默認cpu個數*2)
posted on 2017-01-19 22:00 landon 閱讀(2910) 評論(0)  編輯  收藏 所屬分類: Book

只有注冊用戶登錄后才能發表評論。


網站導航:
 
魔法糖果闯关 黑龙江十一选五近期中奖号码 3d绝杀号公式 股票涨跌怎么算公式 大乐透走势图2 可以赚钱的靠谱软件吗 今天体彩大乐透开什么号码查询 今日股票推荐短线个股推荐 十一选五浙江十一选五走势图基本走势 厦港真真加盟赚钱吗 青鹏棋牌官网 双色球复式投注矩阵图 炉石卡包淘宝靠什么赚钱吗 云南十一选五计划预测 腾讯分分彩走势图0 981棋牌最新版安装 青海十一选五彩票控基础