北京正規(guī)網(wǎng)站建設(shè)公司百度助手安卓版下載
前言
學(xué)習(xí)Spark源碼繞不開通信,Spark通信是基于Netty實(shí)現(xiàn)的,所以先簡(jiǎn)單學(xué)習(xí)總結(jié)一下Netty。
Spark 通信歷史
最開始: Akka
Spark 1.3: 開始引入Netty,為了解決大塊數(shù)據(jù)(如Shuffle)的傳輸問題
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全廢棄Akka,全部使用Netty
Akka 是一個(gè)用 Scala 編寫的庫,用于簡(jiǎn)化編寫容錯(cuò)的、高可伸縮性的 Java 和 Scala 的 Actor 模型應(yīng)用。
Spark 借鑒Akka 通過 Netty 實(shí)現(xiàn)了類似的簡(jiǎn)約版的Actor 模型
Netty
Server 主要代碼:
// 創(chuàng)建ServerBootstrap實(shí)例,服務(wù)器啟動(dòng)對(duì)象
ServerBootstrap bootstrap = new ServerBootstrap();ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服務(wù)器關(guān)閉
channelFuture.channel().closeFuture().sync();
主要是啟動(dòng) ServerBootstrap、綁定端口、等待關(guān)閉。
Client 主要代碼:
// 創(chuàng)建Bootstrap實(shí)例,客戶端啟動(dòng)對(duì)象
Bootstrap bootstrap = new Bootstrap();
// 連接服務(wù)端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
Server 添加 Handler
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ServerHandler());}
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}
});
這里的 ServerHandler 和 ClientHandler 都是自己實(shí)現(xiàn)的類,處理具體的邏輯。
如channelActive 建立連接時(shí)發(fā)消息給服務(wù)器,channelRead 讀取數(shù)據(jù)時(shí)調(diào)用,處理讀取數(shù)據(jù)的邏輯。給服務(wù)器或者客戶端發(fā)消息可以用 writeAndFlush 方法。
完整代碼
地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo
NettyServer
package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {try {bind();} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void bind() throws InterruptedException {// 創(chuàng)建boss線程組,用于接收連接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 創(chuàng)建worker線程組,用于處理連接上的I/O操作,含有子線程N(yùn)ioEventGroup個(gè)數(shù)為CPU核數(shù)大小的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 創(chuàng)建ServerBootstrap實(shí)例,服務(wù)器啟動(dòng)對(duì)象ServerBootstrap bootstrap = new ServerBootstrap();// 使用鏈?zhǔn)骄幊膛渲脜?shù)// 將boss線程組和worker線程組暫存到ServerBootstrapbootstrap.group(bossGroup, workerGroup);// 設(shè)置服務(wù)端Channel類型為NioServerSocketChannel作為通道實(shí)現(xiàn)bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加ServerHandler到ChannelPipeline,對(duì)workerGroup的SocketChannel(客戶端)設(shè)置處理器socketChannel.pipeline().addLast(new ServerHandler());}});// 設(shè)置啟動(dòng)參數(shù),初始化服務(wù)器連接隊(duì)列大小。服務(wù)端處理客戶端連接請(qǐng)求是順序處理,一個(gè)時(shí)間內(nèi)只能處理一個(gè)客戶端請(qǐng)求// 當(dāng)有多個(gè)客戶端同時(shí)來請(qǐng)求時(shí),未處理的請(qǐng)求先放入隊(duì)列中bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 綁定端口并啟動(dòng)服務(wù)器,bind方法是異步的,sync方法是等待異步操作執(zhí)行完成,返回ChannelFuture異步對(duì)象ChannelFuture channelFuture = bootstrap.bind(8888).sync();// 等待服務(wù)器關(guān)閉channelFuture.channel().closeFuture().sync();} finally {// 優(yōu)雅地關(guān)閉boss線程組bossGroup.shutdownGracefully();// 優(yōu)雅地關(guān)閉worker線程組workerGroup.shutdownGracefully();}}
}
ServerHandler
package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 當(dāng) Channel 已經(jīng)注冊(cè)到它的 EventLoop 并且能夠處理 I/O 時(shí)被調(diào)用** @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelRegistered");}/*** 當(dāng) Channel 從它的 EventLoop 注銷并且無法處理任何 I/O 時(shí)被調(diào)* 用** @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelUnregistered");}/*** 當(dāng) Channel 處于活動(dòng)狀態(tài)時(shí)被調(diào)用;Channel 已經(jīng)連接/綁定并且已經(jīng)就緒** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelActive");}/*** 當(dāng) Channel 離開活動(dòng)狀態(tài)并且不再連接它的遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelInactive");}/*** 當(dāng)從 Channel 讀取數(shù)據(jù)時(shí)被調(diào)用** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("執(zhí)行 channelRead");// 處理接收到的數(shù)據(jù)ByteBuf byteBuf = (ByteBuf) msg;try {// 將接收到的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("Server端收到客戶消息: " + message);// 發(fā)送響應(yīng)消息給客戶端ctx.writeAndFlush(Unpooled.copiedBuffer("我是服務(wù)端,我收到你的消息啦~", CharsetUtil.UTF_8));} finally {// 釋放ByteBuf資源ReferenceCountUtil.release(byteBuf);}}/*** 當(dāng) Channel 上的一個(gè)讀操作完成時(shí)被調(diào)用,對(duì)通道的讀取完成的事件或通知。當(dāng)讀取完成可通知發(fā)送方或其他的相關(guān)方,告訴他們接受方讀取完成** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelReadComplete");}/*** 當(dāng) ChannelnboundHandler.fireUserEventTriggered()方法被調(diào)用時(shí)被* 調(diào)用** @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("執(zhí)行 userEventTriggered");}/*** 當(dāng) Channel 的可寫狀態(tài)發(fā)生改變時(shí)被調(diào)用??梢酝ㄟ^調(diào)用 Channel 的 isWritable()方法* * 來檢測(cè) Channel 的可寫性。與可寫性相關(guān)的閾值可以通過* * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法來* * 設(shè)置** @param ctx* @throws Exception*/@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelWritabilityChanged");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("執(zhí)行 exceptionCaught");// 異常處理cause.printStackTrace();ctx.close();}
}
NettyClient
package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {start();}public static void start() {// 創(chuàng)建EventLoopGroup,用于處理客戶端的I/O操作EventLoopGroup groupThread = new NioEventLoopGroup();try {// 創(chuàng)建Bootstrap實(shí)例,客戶端啟動(dòng)對(duì)象Bootstrap bootstrap = new Bootstrap();bootstrap.group(groupThread);// 設(shè)置服務(wù)端Channel類型為NioSocketChannel作為通道實(shí)現(xiàn)bootstrap.channel(NioSocketChannel.class);// 設(shè)置客戶端處理bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}});// 綁定端口ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 優(yōu)雅地關(guān)閉線程groupThread.shutdownGracefully();}}
}
ClientHandler
package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 連接建立時(shí)的處理,發(fā)送請(qǐng)求消息給服務(wù)器ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服務(wù)端!我是客戶端,測(cè)試通道連接", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 處理接收到的數(shù)據(jù)ByteBuf byteBuf = (ByteBuf) msg;try {// 將接收到的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("受到服務(wù)端響應(yīng)的消息: " + message);// TODO: 對(duì)數(shù)據(jù)進(jìn)行業(yè)務(wù)處理} finally {// 釋放ByteBuf資源ReferenceCountUtil.release(byteBuf);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 異常處理cause.printStackTrace();ctx.close();}
}
運(yùn)行截圖
handler 執(zhí)行順序
Server 端
連接時(shí):執(zhí)行 channelRegistered
執(zhí)行 channelActive
執(zhí)行 channelRead
執(zhí)行 channelReadComplete斷開連接時(shí):執(zhí)行 channelReadComplete
(強(qiáng)制中斷 Client 連接
執(zhí)行 exceptionCaught
執(zhí)行 userEventTriggered (exceptionCaught 中 ctx.close()) 觸發(fā)
)
執(zhí)行 channelInactive
執(zhí)行 channelUnregisteredchannelReadComplete 中 ctx.close(); 觸發(fā):
執(zhí)行 channelInactive
執(zhí)行 channelUnregistered
Client 端
執(zhí)行 channelRegistered
執(zhí)行 channelActive
執(zhí)行 channelRead
執(zhí)行 channelReadComplete
Spark 對(duì)應(yīng)位置
- Spark版本:3.2.3
- Server: org.apache.spark.network.server.TransportServer.init
- Client: org.apache.spark.network.client.TransportClientFactory.createClient