h5高端網(wǎng)站建設(shè)谷歌搜索引擎在線
??雖然了解了整個內(nèi)存池管理的細節(jié),包括它的內(nèi)存分配的具體邏輯,但是每次從NioSocketChannel中讀取數(shù)據(jù)時,應該分配多少內(nèi)存去讀呢? 例如,客戶端發(fā)送的數(shù)據(jù)為1KB , 應該分配多少內(nèi)存去讀呢? 例如: 客戶端發(fā)送的數(shù)據(jù)為1KB , 若每次都分配8KB的內(nèi)存去讀取數(shù)據(jù),則會導致內(nèi)存大量浪費,若分配16B的內(nèi)存去讀取數(shù)據(jù),那么需要64次才能全部讀完, 對性能的有很大的影響 , 那么對于 這個問題,Netty是如何解決的呢?
??NioEventLoop線程在處理OP_READ事件,進入NioByteUnsafe循環(huán)讀取數(shù)據(jù)時,使用了兩個類來處理內(nèi)存的分配,一個是ByteBufAllocator, PooledByteBufAllocator為它的默認實現(xiàn)類, 另一個是RecvByteBufAllocator,AdaptiveRecvByteBufAllocator是它的默認實現(xiàn)類,在DefaultChannelConfig初始化時設(shè)置 , PooledByteBufAllocator主要用來處理內(nèi)存分配,并最終委托PoolArena去完成,AdaptiveRecvByteBufAllocator主要用來計算每次讀循環(huán)時應該分配多少內(nèi)存,NioByteUnsafe之所有需要循環(huán)讀取,主要是因為分配的初始ByteBuf不一定能夠容納讀取到的所有數(shù)據(jù),NioByteUnsafe循環(huán)讀取的核心代碼解讀如下 :
public final void read() {// 獲取pipeline通道配置,Channel管道final ChannelConfig config = config();// socketChannel已經(jīng)關(guān)閉if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();// 獲取內(nèi)存分配器,默認為PooledByteBufAllocatorfinal ByteBufAllocator allocator = config.getAllocator();// 獲取RecvByteBufAllocator內(nèi)部的計算器Handlefinal RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();// 清空上一次讀取的字節(jié)數(shù),每次讀取時均重新計算// 字節(jié)buf分配器, 并計算字節(jié)buf分配器HandlerallocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {//當對端發(fā)送一個超大的數(shù)據(jù)包時,TCP會拆包。// OP_READ事件只會觸發(fā)一次,Netty需要循環(huán)讀,默認最多讀16次,因此ChannelRead()可能會觸發(fā)多次,拿到的是半包數(shù)據(jù)。// 如果16次沒把數(shù)據(jù)讀完,沒有關(guān)系,下次select()還會繼續(xù)處理。// 對于Selector的可讀事件,如果你沒有讀完數(shù)據(jù),它會一直返回。do {// 分配內(nèi)存 ,allocator根據(jù)計算器Handle計算此次需要分配多少內(nèi)存并從內(nèi)存池中分配// 分配一個ByteBuf,大小能容納可讀數(shù)據(jù),又不過于浪費空間。byteBuf = allocHandle.allocate(allocator);// 讀取通道接收緩沖區(qū)的數(shù)據(jù) , 設(shè)置最后一次分配內(nèi)存大小加上每次讀取的字節(jié)數(shù)// doReadBytes(byteBuf):ByteBuf內(nèi)部有ByteBuffer,底層還是調(diào)用了SocketChannel.read(ByteBuffer)// allocHandle.lastBytesRead()根據(jù)讀取到的實際字節(jié)數(shù),自適應調(diào)整下次分配的緩沖區(qū)大小。allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.// 若沒有數(shù)據(jù)可讀,則釋放內(nèi)存byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.// 當讀到-1時, 表示Channel 通道已經(jīng)關(guān)閉// 沒有必要再繼續(xù)readPending = false;}break;}// 更新讀取消息計數(shù)器, 遞增已經(jīng)讀取的消息數(shù)量allocHandle.incMessagesRead(1);readPending = false;// 通知通道處理讀取數(shù)據(jù),觸發(fā)Channel管道的fireChannelRead事件pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());// 讀取操作完畢 ,讀結(jié)束后調(diào)用,記錄此次實際讀取到的數(shù)據(jù)大小,并預測下一次內(nèi)存分配大小allocHandle.readComplete();// 觸發(fā)Channel管道的fireChannelReadComplete事件pipeline.fireChannelReadComplete();if (close) {// 如果Socket通道關(guān)閉,則關(guān)閉讀操作closeOnRead(pipeline);}} catch (Throwable t) {// 處理讀取異常handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {// 若操作完畢,且沒有配置自動讀// 則從選擇Key興趣集中移除讀操作事件removeReadOp();}}} }
??每一次創(chuàng)建byteBuf分配內(nèi)存大小是多大呢? 這個由allocate()方法內(nèi)部的guess()方法來決定 。
public ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess()); }
??如果是第一次 調(diào)用guess()方法,默認分配1024B的內(nèi)存空間 ,后面分配內(nèi)存大小動態(tài)調(diào)節(jié) 。
// 實現(xiàn)doReadBytes()方法,從SocketChannel中讀取數(shù)據(jù)。 protected int doReadBytes(ByteBuf byteBuf) throws Exception {// 獲取計算內(nèi)存分配器Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();// 設(shè)置嘗試讀取字節(jié)數(shù)組的buf的可寫字節(jié)數(shù)allocHandle.attemptedBytesRead(byteBuf.writableBytes());// 從Channel中讀取字節(jié)并寫入到buf中,返回讀取的字節(jié)數(shù)return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
??在這里,我們需要明白byteBuf.writableBytes()這個方法,writableBytes()方法的返回值為byteBuf中可寫的字節(jié)數(shù),內(nèi)部計算方法用byteBuf的容量- byteBuf的寫索引得出,而byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());這一行代碼,實際上就是將Channel中的數(shù)據(jù)寫入到byteBuf中,返回值為實際寫入到ByteBuf中的字節(jié)數(shù)。
??RecvByteBufAllocator的默認實現(xiàn)類AdaptiveRecvByteBufAllocator是實際的緩沖管理區(qū),這個類可以根據(jù)讀取到的數(shù)據(jù)預測所需要的字節(jié)的多少,從而自動增加或減少,如果上一次讀循環(huán)將緩沖區(qū)的寫滿了,那么預測的字節(jié)數(shù)會變大,如果連續(xù)兩次循環(huán)都不能填滿已經(jīng)分配的緩沖區(qū),則預測字節(jié)數(shù)會變小。
public void lastBytesRead(int bytes) {// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.// This helps adjust more quickly when large amounts of data is pending and can avoid going back to// the selector to check for more data. Going back to the selector can add significant latency for large// data transfers.// 如果上 一次讀循環(huán)將緩沖區(qū)填充滿了,那么預測的字節(jié)數(shù)會變大if (bytes == attemptedBytesRead()) {// 如果此次讀取將緩沖區(qū)填充滿了,增加一次記錄的機會record(bytes);}super.lastBytesRead(bytes); }// 該方法的參數(shù)是一次讀取操作中實際讀取到的數(shù)據(jù)大小,將其與nextReceiveBufferSize 進行比較,如果實際字節(jié)數(shù)actualReadBytes大于等于該值,則立即更新nextReceiveBufferSize , // 其更新后的值與INDEX_INCREMENT有關(guān)。INDEX_INCREMENT為默認常量,值為4。也就是說在擴容時會一次性增大多一些,以保證下次有足夠空間可以接收數(shù)據(jù)。而相對擴容的策略, // 縮容策略則實際保守些,常量為INDEX_INCREMENT,值為1,同樣也是進行對比, 但不同的是,若實際字節(jié)小于所用nextReceiveBufferSize,并不會立馬進行大小調(diào)整, // 而是先把 decreaseNow 設(shè)置為true,如果下次仍然小于,則才會減少nextReceiveBufferSize的大小 private void record(int actualReadBytes) {// 如果小了兩個數(shù)量級,則需要縮容if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {if (decreaseNow) { // 若減少標識decreaseNow連續(xù)兩次為true, 則說明下次讀取字節(jié)數(shù)需要減少SIZE_TABLE下標減1index = max(index - INDEX_DECREMENT, minIndex);nextReceiveBufferSize = SIZE_TABLE[index];decreaseNow = false;} else {decreaseNow = true; // 第一次減少,只做記錄}} else if (actualReadBytes >= nextReceiveBufferSize) { // 實際讀取的字節(jié)大小要大于或等于預測值index = min(index + INDEX_INCREMENT, maxIndex); // SIZE_TABLE 下標 + 4nextReceiveBufferSize = SIZE_TABLE[index]; // 若當前緩存為512,則變成 512 * 2 ^ 4decreaseNow = false;} }public void lastBytesRead(int bytes) {// 設(shè)置最后讀取的字節(jié)數(shù)lastBytesRead = bytes;if (bytes > 0) {// 總讀取的字節(jié)數(shù)totalBytesRead += bytes;} }
??上述過程中,SIZE_TABLE是什么呢? 請看AdaptiveRecvByteBufAllocator源碼實現(xiàn)。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {static final int DEFAULT_MINIMUM = 64; // 接收緩沖區(qū)的最小長度下限static final int DEFAULT_INITIAL = 1024; // 接收緩沖區(qū)的最大長度上限static final int DEFAULT_MAXIMUM = 65536; // 接收緩沖區(qū)最大長度上限// 在調(diào)整緩沖區(qū)大小時,若是增加緩沖區(qū)容量,那么增加的索引值。// 比如,當前緩沖區(qū)的大小為SIZE_TABLE[20],若預測下次需要創(chuàng)建的緩沖區(qū)需要增加容量大小,// 則新緩沖區(qū)的大小為SIZE_TABLE[20 + INDEX_INCREMENT],即SIZE_TABLE[24]private static final int INDEX_INCREMENT = 4; // 擴容增長量// 在調(diào)整緩沖區(qū)大小時,若是減少緩沖區(qū)容量,那么減少的索引值。// 比如,當前緩沖區(qū)的大小為SIZE_TABLE[20],若預測下次需要創(chuàng)建的緩沖區(qū)需要減小容量大小,// 則新緩沖區(qū)的大小為SIZE_TABLE[20 - INDEX_DECREMENT],即SIZE_TABLE[19]private static final int INDEX_DECREMENT = 1; // 擴容減少量private static final int[] SIZE_TABLE;// 分配了一個int類型的數(shù)組,并進行了數(shù)組的初始化處理, 從實現(xiàn)來看,該數(shù)組的長度是53,前32位是16的倍數(shù),value值是從16開始的,到512,從33位開始,值是前一位的// 兩倍,即從1024,2048 , 到最大值 1073741824 。static {List<Integer> sizeTable = new ArrayList<Integer>();for (int i = 16; i < 512; i += 16) {sizeTable.add(i);}for (int i = 512; i > 0; i <<= 1) {sizeTable.add(i);}SIZE_TABLE = new int[sizeTable.size()];for (int i = 0; i < SIZE_TABLE.length; i++) {SIZE_TABLE[i] = sizeTable.get(i);}System.out.println("================");}/*** @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.*/public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();// 入?yún)⑹且粋€大小,然后利用二分查找法對該數(shù)組進行size定位 ,目標是為了找出該size值在數(shù)組中的下標位置 , 主要是為了初始化maxIndex, maxIndex這兩個參數(shù)private static int getSizeTableIndex(final int size) {for (int low = 0, high = SIZE_TABLE.length - 1; ; ) {if (high < low) {return low;}if (high == low) {return high;}int mid = low + high >>> 1;int a = SIZE_TABLE[mid];int b = SIZE_TABLE[mid + 1];if (size > b) {low = mid + 1;} else if (size < a) {high = mid - 1;} else if (size == a) {return mid;} else {return mid + 1;}}}private final int minIndex;private final int maxIndex;private final int initial;/*** Creates a new predictor with the default parameters. With the default* parameters, the expected buffer size starts from {@code 1024}, does not* go down below {@code 64}, and does not go up above {@code 65536}.*/public AdaptiveRecvByteBufAllocator() {this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);}/*** Creates a new predictor with the specified parameters.* @param minimum the inclusive lower bound of the expected buffer size* @param initial the initial buffer size when no feed back was received* @param maximum the inclusive upper bound of the expected buffer size*/public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {checkPositive(minimum, "minimum");if (initial < minimum) {throw new IllegalArgumentException("initial: " + initial);}if (maximum < initial) {throw new IllegalArgumentException("maximum: " + maximum);}int minIndex = getSizeTableIndex(minimum);if (SIZE_TABLE[minIndex] < minimum) {this.minIndex = minIndex + 1;} else {this.minIndex = minIndex;}int maxIndex = getSizeTableIndex(maximum);if (SIZE_TABLE[maxIndex] > maximum) {this.maxIndex = maxIndex - 1;} else {this.maxIndex = maxIndex;}this.initial = initial;}@Overridepublic Handle newHandle() {return new HandleImpl(minIndex, maxIndex, initial);}@Overridepublic AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {super.respectMaybeMoreData(respectMaybeMoreData);return this;}}
??SIZE_TABLE由上述加粗代碼進行初始化 。 AdaptiveRecvByteBufAllocator內(nèi)部維護了一個SIZE_TABLE數(shù)組,記錄了不同的內(nèi)存的內(nèi)存塊大小,按照分配需要尋找最合適的內(nèi)存塊,SIZE_TABLE數(shù)組中的值為2^n,這樣便于軟硬件進行處理,SIZE_TABLE數(shù)組的初始化與PoolArena中的normalizeCapacity的初始化類似,當需要的內(nèi)存很小時 , 增長的幅度不大, 當需要的內(nèi)存較大時, 增長的幅度比較大,因此在[16,512]區(qū)間每次增加16,直到512,而從512起,每次翻一倍, 直到int的最大值 。 那size的具體大小值是什么呢?
SIZE_TABLE 數(shù)組的toString()打印如下 :
[16B, 32B, 48B, 64B, 80B, 96B, 112B, 128B, 144B, 160B, 176B, 192B, 208B, 224B, 240B, 256B, 272B, 288B, 304B, 320B, 336B, 352B, 368B, 384B, 400B, 416B, 432B, 448B, 464B, 480B, 496B, 512B, 1k, 2k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1M, 2M, 4M, 8M, 16M, 32M, 64M, 128M, 256M, 512M, 1G]
??當對內(nèi)部計算器Handle的具體實現(xiàn)類HandleImpl進行初始化時,可根據(jù)AdaptiveRecvByteBufAllocator的getSizeTableIndex()二分查找方法獲取SIZE_TABLE的下標index并保存,通過SIZE_TABLE[index]獲取下次需要分配的緩沖區(qū)大小nextReceiveBufferSize并記錄,緩沖區(qū)的最小容量屬性對SIZE_TABLE中的下標為minIndex的值 , 最大容量屬性對應的SIZE_TABLE中的下標為maxIndex的值及bool類型標識屬性decreaseNow ,這三個屬性用于判斷下一次創(chuàng)建緩沖區(qū)是否需要減少 。
??NioByteUnsafe每次循環(huán)完成后會根據(jù)實際讀取到的字節(jié)數(shù)和當前緩沖區(qū)的大小重新設(shè)置下次需要分配的緩沖區(qū)的大小。 具體代碼如下 。
// 循環(huán)讀取完后被調(diào)用 public void readComplete() {record(totalBytesRead()); }//返回已經(jīng)讀取的字節(jié)個數(shù),若‘totalBytesRead < 0’則說明已經(jīng)讀取的字節(jié)數(shù)已經(jīng)操作了’Integer.MAX_VALUE’,則返回Integer.MAX_VALUE;否則返回真實的已經(jīng)讀取的字節(jié)數(shù)。 protected final int totalBytesRead() {return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead; }
??可以模擬NioByteUnsafe的read()方法,在每次循環(huán)開始時, 一定要先重置totalMessages與totalByteRead(清零),讀取完成后, readComplete會計算并調(diào)整下次預計需要分配的緩沖區(qū)的大小, 具體代碼如下
public static void main(String[] args) throws Exception {AdaptiveRecvByteBufAllocator allocator = new AdaptiveRecvByteBufAllocator();RecvByteBufAllocator.Handle handle = allocator.newHandle();System.out.println("==============開始 I/O 讀事件模擬==============");// 讀取循環(huán)開始前先重置,將讀取的次數(shù)和字節(jié)數(shù)設(shè)置為0, 將totalMessages與totalBytesRead設(shè)置為0handle.reset(null);System.out.println(String.format("第一次模擬讀,需要分配大小 :%d", handle.guess()));handle.lastBytesRead(256);// 調(diào)整下次預測值handle.readComplete();// 在每次讀取數(shù)據(jù)時都需要重置totalMessage 與totalBytesReadhandle.reset(null);System.out.println(String.format("第2次花枝招展讀,需要分配大小:%d ", handle.guess()));handle.lastBytesRead(256);handle.readComplete();System.out.println("===============連續(xù)2次讀取的字節(jié)數(shù)小于默認分配的字節(jié)數(shù)= =========================");handle.reset(null);System.out.println(String.format("第3次模擬讀,需要分配大小 : %d", handle.guess()));handle.lastBytesRead(512);// 調(diào)整下次預測值,預測值應該增加到512 * 2 ^ 4handle.readComplete();System.out.println("==================讀取的字節(jié)數(shù)變大 ===============");handle.reset(null);// 讀循環(huán)中緩沖區(qū)的大小System.out.println(String.format("第4次模擬讀,需要分配的大小為:%d ", handle.guess())); }
??結(jié)果輸出
??當然啦,如果覺得自己已經(jīng)很明白了,可以看看下面這個例子。
public class Test2 {public static void main(String[] args) {AdaptiveRecvByteBufAllocator allocator = new AdaptiveRecvByteBufAllocator();RecvByteBufAllocator.Handle handle = allocator.newHandle();System.out.println("==============開始 I/O 讀事件模擬==============");// 讀取循環(huán)開始前先重置,將讀取的次數(shù)和字節(jié)數(shù)設(shè)置為0, 將totalMessages與totalBytesRead設(shè)置為0handle.reset(null);System.out.println(String.format("第一次模擬讀,需要分配大小 :%d", handle.guess()));handle.lastBytesRead(512);// 調(diào)整下次預測值handle.readComplete();// 在每次讀取數(shù)據(jù)時都需要重置totalMessage 與totalBytesReadhandle.reset(null);System.out.println(String.format("第2次花枝招展讀,需要分配大小:%d ", handle.guess()));handle.lastBytesRead(512);handle.readComplete();System.out.println("===============連續(xù)2次讀取的字節(jié)數(shù)小于默認分配的字節(jié)數(shù)= =========================");handle.reset(null);System.out.println(String.format("第3次模擬讀,需要分配大小 : %d", handle.guess()));} }
??最后一次結(jié)果輸出為1024,并沒有縮容,源碼讀到這里,我相信對輸出結(jié)果已經(jīng)沒有什么意外了。
接下來看一個例子。
Netty服務端代碼
public class NettyServer {public static void main(String[] args) {// 創(chuàng)建兩個線程組bossGroup 和workerGroup , 含有的子線程NioEventLoop 的個數(shù)默認為CPU 核數(shù)的兩倍// BossGroup只是處理連接請求,真正的和客戶端業(yè)務處理,會交給workerGroup完成EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 創(chuàng)建服務端的啟動對象ServerBootstrap bootstrap = new ServerBootstrap();// 使用鏈式編程來配置參數(shù)bootstrap.group(bossGroup, workerGroup)//設(shè)置兩個線程組.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel 作為服務器的通道實現(xiàn)// 初始化服務器連接隊列大小,服務端處理客戶端連接請求是順序處理的,所以同一時間,只能處理一個客戶端連接,多個客戶端同時來的時候// 服務端將不能處理的客戶端連接請求放在隊列中等待處理.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2056));// 對workerGroup 的SocketChannel設(shè)置處理器ch.pipeline().addLast(new NettyServerHandler());}});System.out.println("netty server start ....");// 綁定一個商品并且同步,生成一個ChannelFuture異步對象,通過isDone()等方法可以判斷異步事件的執(zhí)行情況// 啟動服務器(并綁定端口),bind是異步操作,sync方法是等待異步操作執(zhí)行完畢ChannelFuture cf = bootstrap.bind(9000).sync();// 給注冊監(jiān)聽器,監(jiān)聽我們關(guān)心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("監(jiān)聽端口9000成功");} else {System.out.println("監(jiān)聽端口9000失敗");}}});// 對通道關(guān)閉進行監(jiān)聽,closeFuture是異步操作,監(jiān)聽通道關(guān)閉// 通過sync方法同步等待通道關(guān)閉處理完畢,這里會阻塞等待通道關(guān)閉完成cf.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }// 自定義Handler需要繼承netty 規(guī)定好的某個HandlerAdapter(規(guī)范) public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 讀取客戶端發(fā)送的數(shù)據(jù)** @param ctx 上下文對象,含有通道channel ,管道 pipeline* @param msg 就是客戶端發(fā)送的數(shù)據(jù)* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服務器讀取的線程 :" + Thread.currentThread().getName());ByteBuf buf = (ByteBuf) msg;System.out.println("客戶端發(fā)送的消息是: " + buf.toString(CharsetUtil.UTF_8));}/*** 數(shù)據(jù)讀取完畢處理方法* @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("=================channelReadComplete======================");ByteBuf buf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);ctx.writeAndFlush(buf);}// 處理異常,一般需要關(guān)閉通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }
Netty客戶端代碼
public class NettyClient {public static void main(String[] args) {// 客戶端需要一個事件循環(huán)組EventLoopGroup group = new NioEventLoopGroup();try {// 創(chuàng)建客戶端啟動對象// 注意,客戶端使用的不是ServerBootstrap , 而是BootstrapBootstrap bootstrap = new Bootstrap();// 設(shè)置相關(guān)的參數(shù)bootstrap.group(group) //設(shè)置線程組.channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實現(xiàn).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler());}});System.out.println("netty client start ");// 啟動客戶端去連接服務器端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9000).sync();// 對關(guān)閉通道進行監(jiān)聽channelFuture.channel().closeFuture().sync();}catch (Exception e ){e.printStackTrace();}finally {group.shutdownGracefully();}} }public class NettyClientHandler extends ChannelInboundHandlerAdapter {// 當客戶端連接服務器完成就會觸發(fā)這個方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {StringBuffer sb = new StringBuffer();for(int i = 0 ;i < 1023;i ++){sb.append("a");}sb.append("中");sb.append("bbbb");ByteBuf buf = Unpooled.copiedBuffer(sb.toString(), CharsetUtil.UTF_8);ctx.writeAndFlush(buf);}// 當通道在讀取事件時會觸發(fā),即服務端發(fā)送數(shù)據(jù)給客戶端@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(" 收到服務端的消息: " + buf.toString(CharsetUtil.UTF_8));System.out.println("服務端的地址:" + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }
??在NettyClientHandler的channelActive()方法中,先for循環(huán)寫了1023個字節(jié), 然后寫了一個"中" 字,utf-8編碼一個中文占3個字節(jié),再寫了4個"bbbb",因此最終寫入到ByteBuf中是1030個字節(jié) 。
??我們之前也分析過,第一次讀取時,ByteBuf默認容量為1024,因此在NioSocketChannel的read()方法中,while()循環(huán)中會循環(huán)兩遍。 如下圖所示 。
??而剛好在ByteBuf的[1024,1025,1026]這三個字節(jié)中被"中"的中字占用,因此ByteBuf取0~1023個字節(jié)時,“中”字被截斷了 。
??最終在服務端代碼中打印了兩次ByteBuf字符串信息,發(fā)現(xiàn)打印的信息中文亂碼。
??這個問題怎樣解決呢?
解決方案一
??如果說 Netty 默認提供了一個可變的緩沖區(qū)大小分配方案,那么我們可不可以改變這個策略呢?從AdaptiveRecvByteBufAllocator開始向上找到根類型,可以最終找到 RecvByteBufAllocator 接口上,查看這個接口的子類,應該會有其他緩沖區(qū)大小分配方案。
??這里有一個固定的接收數(shù)組空間分配器,現(xiàn)在只要想辦法把默認的 AdaptiveRecvByteBufAllocator換成 FixedRecvByteBufAllocator 就可以解決問題了。
??首先調(diào)用 config方法,然后調(diào)用getRecvByteBufAllocator來創(chuàng)建這個allocHandle。既然有g(shù)etRecvByteBufAllocator()方法,那肯定有setRecvByteBufAllocator()方法。
??因此只需要調(diào)用config()的setRecvByteBufAllocator()方法即可。
?? ByteBuf一次就打印完了,并沒有出現(xiàn)中文亂碼。
??對于這個問題, 還有另外一種解決方案。
方案二
??客戶端代碼修改
??在內(nèi)容前面添加內(nèi)容的長度 。
?? 在initChannel()方法中添加ch.pipeline().addLast(new NettyServerHandler2()) 一行代碼。
??的具體代碼如下
public class NettyServerHandler2 extends ByteToMessageDecoder {private int alreadyReadLength ;private int sumByteLength ;private ByteBuf buf ;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("NettyServerHandler2 invoke");if (sumByteLength == 0) {sumByteLength = in.readInt();buf = Unpooled.copiedBuffer("", CharsetUtil.UTF_8);}int readableBytes = in.readableBytes();alreadyReadLength += readableBytes;byte[] data = new byte[readableBytes];in.readBytes(data);buf.writeBytes(data);if (alreadyReadLength == sumByteLength) {sumByteLength = 0;byte[] outData = new byte[buf.readableBytes()];buf.readBytes(outData);out.add(new String(outData,"utf-8"));buf.release();buf = null;}} }
??寫一個Handler繼承ByteToMessageDecoder,而在這個類的內(nèi)部定義了三個屬性,alreadyReadLength記錄已經(jīng)讀取的字節(jié)數(shù), sumByteLength本次客戶端發(fā)送過來的總字節(jié)數(shù), buf 臨時存儲客戶端傳遞過來的字節(jié),當alreadyReadLength和sumByteLength相等時,則表示字節(jié)已經(jīng)讀取完全 。 此時可以將數(shù)據(jù)寫回到out中。因此NettyServerHandler2的主要作用就是合并客戶端傳遞過來的字節(jié),從而避免客戶端數(shù)據(jù)還沒有讀取完就時行業(yè)務處理。
??沒有出現(xiàn)亂碼問題了,個人覺得第二種方案比第一種方案更加好,因為在第一種方案中調(diào)用了ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2056)) 方法,指定了RecvByteBufAllocator 為FixedRecvByteBufAllocator,并且初始化ByteBuf的容量為2056,如果此次用戶發(fā)送的byte長度是1030,這是已知的,如果用戶第一次請求的字節(jié)長度是3000,是不是又要修改FixedRecvByteBufAllocator中的bufferSize的值為3000,又要重啟Netty服務器,顯然不適用于生產(chǎn)環(huán)境 。 而第二種方案基本上適用于所有的情況 ,當然啦,第二種情況在NettyServerHandler2定義了三個局部變量alreadyReadLength,sumByteLength,buf 那會不會存在并發(fā)問題呢? 這個不得而知,因為我自己對Netty也是在不斷的學習中 ,具體的情況,我會在下一篇博客去求證,但這里也給我們提供了一種解決問題的思路,希望給讀者有借鑒意義 。
第三種解決方案
??當然對于之前提到問題,還有第三種解決方案,我們利用LengthFieldBasedFrameDecoder來解決 。
- 在NettyServer中, 在ChannelInitializer的initChannel方法中添加自定義handler NettyServerHandler2 ,這個類繼承LengthFieldBasedFrameDecoder實現(xiàn)了decode()方法 。
??在NettyServerHandler2的構(gòu)造方法中傳遞了3個參數(shù),這3個參數(shù)的含義為
- maxFrameLength : 發(fā)送的數(shù)據(jù)包最大長度, 發(fā)送數(shù)據(jù)包的最大長度,例如1024,表示一個數(shù)據(jù)包最多可發(fā)送1024個字節(jié)
- lengthFieldOffset: 長度字段的偏移量, 指的是長度字段位于數(shù)據(jù)包內(nèi)部字節(jié)數(shù)組中的下標值
- engthFieldLength: 長度字段自己占用的字節(jié)數(shù),如果長度字段是一個int整數(shù),則為4,如果長度字段是一個short整數(shù),則為2
- lengthAdjustment: 長度字段的偏移量矯正, 這個參數(shù)最為難懂,在傳輸協(xié)議比較復雜的情況下,例如包含了長度字段,協(xié)議版本號, 魔數(shù)等
??那么解碼時,就需要進行長度字段的矯正,長度矯正值的計算公式為:內(nèi)容字段偏移量 - 長度字段偏移量 - 長度字段的字節(jié)數(shù)
- 寫一個NettyServerHandler2繼承LengthFieldBasedFrameDecoder類
public class NettyServerHandler2 extends LengthFieldBasedFrameDecoder {public NettyServerHandler2(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {super(maxFrameLength, lengthFieldOffset, lengthFieldLength);}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {in = (ByteBuf) super.decode(ctx,in);if(in == null){return null;}if(in.readableBytes()<4){throw new Exception("字節(jié)數(shù)不足");}//讀取length字段int length = in.readInt();if(in.readableBytes()!=length){throw new Exception("標記的長度不符合實際長度");}//content內(nèi)容byte []bytes = new byte[length];in.readBytes(bytes);return new String(bytes,"UTF-8");} }
??看輸出結(jié)果
??從結(jié)果輸出來看,也是一次性打印出所有數(shù)據(jù),并沒有出現(xiàn)中文亂碼。 這是第三種解決方案,當然,關(guān)于這一塊的源碼,在下一篇博客 Netty 源碼解析(中) 我會做詳細的分析,這里就不再贅述了。
??到這里這篇博客就告一段落,下一篇博客見。
本文對應github地址為
https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git