中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

北京正規(guī)網(wǎng)站建設(shè)公司百度助手安卓版下載

北京正規(guī)網(wǎng)站建設(shè)公司,百度助手安卓版下載,公司網(wǎng)站的seo優(yōu)化怎么做,開發(fā)網(wǎng)站用得最多的是什么語言前言 學(xué)習(xí)Spark源碼繞不開通信,Spark通信是基于Netty實(shí)現(xiàn)的,所以先簡(jiǎn)單學(xué)習(xí)總結(jié)一下Netty。 Spark 通信歷史 最開始: Akka Spark 1.3: 開始引入Netty,為了解決大塊數(shù)據(jù)(如Shuffle)的傳輸問題 Spark 1.6&…

前言

學(xué)習(xí)Spark源碼繞不開通信,Spark通信是基于Netty實(shí)現(xiàn)的,所以先簡(jiǎn)單學(xué)習(xí)總結(jié)一下Netty。

Spark 通信歷史

最開始: Akka
Spark 1.3: 開始引入Netty,為了解決大塊數(shù)據(jù)(如Shuffle)的傳輸問題
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全廢棄Akka,全部使用Netty

Akka 是一個(gè)用 Scala 編寫的庫,用于簡(jiǎn)化編寫容錯(cuò)的、高可伸縮性的 Java 和 Scala 的 Actor 模型應(yīng)用。
Spark 借鑒Akka 通過 Netty 實(shí)現(xiàn)了類似的簡(jiǎn)約版的Actor 模型

Netty

Server 主要代碼:

// 創(chuàng)建ServerBootstrap實(shí)例,服務(wù)器啟動(dòng)對(duì)象
ServerBootstrap bootstrap = new ServerBootstrap();ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服務(wù)器關(guān)閉
channelFuture.channel().closeFuture().sync();

主要是啟動(dòng) ServerBootstrap、綁定端口、等待關(guān)閉。

Client 主要代碼:

// 創(chuàng)建Bootstrap實(shí)例,客戶端啟動(dòng)對(duì)象
Bootstrap bootstrap = new Bootstrap();
// 連接服務(wù)端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();

Server 添加 Handler

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ServerHandler());}
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}
});

這里的 ServerHandler 和 ClientHandler 都是自己實(shí)現(xiàn)的類,處理具體的邏輯。
如channelActive 建立連接時(shí)發(fā)消息給服務(wù)器,channelRead 讀取數(shù)據(jù)時(shí)調(diào)用,處理讀取數(shù)據(jù)的邏輯。給服務(wù)器或者客戶端發(fā)消息可以用 writeAndFlush 方法。

完整代碼

地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

NettyServer

package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {try {bind();} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void bind() throws InterruptedException {// 創(chuàng)建boss線程組,用于接收連接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 創(chuàng)建worker線程組,用于處理連接上的I/O操作,含有子線程N(yùn)ioEventGroup個(gè)數(shù)為CPU核數(shù)大小的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 創(chuàng)建ServerBootstrap實(shí)例,服務(wù)器啟動(dòng)對(duì)象ServerBootstrap bootstrap = new ServerBootstrap();// 使用鏈?zhǔn)骄幊膛渲脜?shù)// 將boss線程組和worker線程組暫存到ServerBootstrapbootstrap.group(bossGroup, workerGroup);// 設(shè)置服務(wù)端Channel類型為NioServerSocketChannel作為通道實(shí)現(xiàn)bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加ServerHandler到ChannelPipeline,對(duì)workerGroup的SocketChannel(客戶端)設(shè)置處理器socketChannel.pipeline().addLast(new ServerHandler());}});// 設(shè)置啟動(dòng)參數(shù),初始化服務(wù)器連接隊(duì)列大小。服務(wù)端處理客戶端連接請(qǐng)求是順序處理,一個(gè)時(shí)間內(nèi)只能處理一個(gè)客戶端請(qǐng)求// 當(dāng)有多個(gè)客戶端同時(shí)來請(qǐng)求時(shí),未處理的請(qǐng)求先放入隊(duì)列中bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 綁定端口并啟動(dòng)服務(wù)器,bind方法是異步的,sync方法是等待異步操作執(zhí)行完成,返回ChannelFuture異步對(duì)象ChannelFuture channelFuture = bootstrap.bind(8888).sync();// 等待服務(wù)器關(guān)閉channelFuture.channel().closeFuture().sync();} finally {// 優(yōu)雅地關(guān)閉boss線程組bossGroup.shutdownGracefully();// 優(yōu)雅地關(guān)閉worker線程組workerGroup.shutdownGracefully();}}
}

ServerHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 當(dāng) Channel 已經(jīng)注冊(cè)到它的 EventLoop 并且能夠處理 I/O 時(shí)被調(diào)用** @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelRegistered");}/*** 當(dāng) Channel 從它的 EventLoop 注銷并且無法處理任何 I/O 時(shí)被調(diào)* 用** @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelUnregistered");}/*** 當(dāng) Channel 處于活動(dòng)狀態(tài)時(shí)被調(diào)用;Channel 已經(jīng)連接/綁定并且已經(jīng)就緒** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelActive");}/*** 當(dāng) Channel 離開活動(dòng)狀態(tài)并且不再連接它的遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelInactive");}/*** 當(dāng)從 Channel 讀取數(shù)據(jù)時(shí)被調(diào)用** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("執(zhí)行 channelRead");// 處理接收到的數(shù)據(jù)ByteBuf byteBuf = (ByteBuf) msg;try {// 將接收到的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("Server端收到客戶消息: " + message);// 發(fā)送響應(yīng)消息給客戶端ctx.writeAndFlush(Unpooled.copiedBuffer("我是服務(wù)端,我收到你的消息啦~", CharsetUtil.UTF_8));} finally {// 釋放ByteBuf資源ReferenceCountUtil.release(byteBuf);}}/*** 當(dāng) Channel 上的一個(gè)讀操作完成時(shí)被調(diào)用,對(duì)通道的讀取完成的事件或通知。當(dāng)讀取完成可通知發(fā)送方或其他的相關(guān)方,告訴他們接受方讀取完成** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelReadComplete");}/*** 當(dāng) ChannelnboundHandler.fireUserEventTriggered()方法被調(diào)用時(shí)被* 調(diào)用** @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("執(zhí)行 userEventTriggered");}/*** 當(dāng) Channel 的可寫狀態(tài)發(fā)生改變時(shí)被調(diào)用??梢酝ㄟ^調(diào)用 Channel 的 isWritable()方法* * 來檢測(cè) Channel 的可寫性。與可寫性相關(guān)的閾值可以通過* * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法來* * 設(shè)置** @param ctx* @throws Exception*/@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {System.out.println("執(zhí)行 channelWritabilityChanged");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("執(zhí)行 exceptionCaught");// 異常處理cause.printStackTrace();ctx.close();}
}

NettyClient

package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {start();}public static void start() {// 創(chuàng)建EventLoopGroup,用于處理客戶端的I/O操作EventLoopGroup groupThread = new NioEventLoopGroup();try {// 創(chuàng)建Bootstrap實(shí)例,客戶端啟動(dòng)對(duì)象Bootstrap bootstrap = new Bootstrap();bootstrap.group(groupThread);// 設(shè)置服務(wù)端Channel類型為NioSocketChannel作為通道實(shí)現(xiàn)bootstrap.channel(NioSocketChannel.class);// 設(shè)置客戶端處理bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}});// 綁定端口ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 優(yōu)雅地關(guān)閉線程groupThread.shutdownGracefully();}}
}

ClientHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 連接建立時(shí)的處理,發(fā)送請(qǐng)求消息給服務(wù)器ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服務(wù)端!我是客戶端,測(cè)試通道連接", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 處理接收到的數(shù)據(jù)ByteBuf byteBuf = (ByteBuf) msg;try {// 將接收到的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("受到服務(wù)端響應(yīng)的消息: " + message);// TODO: 對(duì)數(shù)據(jù)進(jìn)行業(yè)務(wù)處理} finally {// 釋放ByteBuf資源ReferenceCountUtil.release(byteBuf);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 異常處理cause.printStackTrace();ctx.close();}
}

運(yùn)行截圖


handler 執(zhí)行順序

Server 端

連接時(shí):執(zhí)行 channelRegistered
執(zhí)行 channelActive
執(zhí)行 channelRead
執(zhí)行 channelReadComplete斷開連接時(shí):執(zhí)行 channelReadComplete
(強(qiáng)制中斷 Client 連接
執(zhí)行 exceptionCaught
執(zhí)行 userEventTriggered (exceptionCaught 中 ctx.close()) 觸發(fā)
)
執(zhí)行 channelInactive
執(zhí)行 channelUnregisteredchannelReadComplete 中 ctx.close(); 觸發(fā):
執(zhí)行 channelInactive
執(zhí)行 channelUnregistered

Client 端

執(zhí)行 channelRegistered
執(zhí)行 channelActive
執(zhí)行 channelRead
執(zhí)行 channelReadComplete

Spark 對(duì)應(yīng)位置

  • Spark版本:3.2.3
  • Server: org.apache.spark.network.server.TransportServer.init
  • Client: org.apache.spark.network.client.TransportClientFactory.createClient


http://m.risenshineclean.com/news/60040.html

相關(guān)文章:

  • 外國(guó)大氣網(wǎng)站設(shè)計(jì)谷歌首頁
  • 企業(yè)信息管理系統(tǒng)免費(fèi)小吳seo博客
  • 網(wǎng)頁瀏覽器阻止安裝activex控件惠州seo排名外包
  • 網(wǎng)站要怎么做才能獲得市場(chǎng)份額百度開戶返點(diǎn)
  • 深圳網(wǎng)絡(luò)做網(wǎng)站百度指數(shù)在線查詢
  • 成都的設(shè)計(jì)院有哪些上海小紅書seo
  • 有哪些做特賣的網(wǎng)站福建seo排名
  • 廣州做網(wǎng)店哪個(gè)網(wǎng)站批發(fā)網(wǎng)百度查詢最火的關(guān)鍵詞
  • 有想做企業(yè)網(wǎng)站建設(shè)微商怎么引流被別人加
  • magento網(wǎng)站遷移seo排名優(yōu)化有哪些
  • 重慶網(wǎng)站網(wǎng)頁設(shè)計(jì)培訓(xùn)機(jī)構(gòu)網(wǎng)站統(tǒng)計(jì)平臺(tái)
  • html5可以做動(dòng)態(tài)網(wǎng)站網(wǎng)絡(luò)關(guān)鍵詞優(yōu)化方法
  • 高德地圖開發(fā)平臺(tái)淘寶seo搜索優(yōu)化
  • 用心做的網(wǎng)站軟件開發(fā)公司推薦
  • 網(wǎng)站開發(fā)軟件手機(jī)版網(wǎng)絡(luò)科技公司騙了我36800
  • 專用車網(wǎng)站建設(shè)哪家好比較靠譜的電商培訓(xùn)機(jī)構(gòu)
  • 北京建行網(wǎng)站營(yíng)銷策劃方案
  • 做網(wǎng)站收入長(zhǎng)沙正規(guī)競(jìng)價(jià)優(yōu)化推薦
  • 手機(jī)端怎么網(wǎng)站建設(shè)seo標(biāo)簽優(yōu)化
  • 自己有網(wǎng)站怎么做點(diǎn)卡網(wǎng)絡(luò)推廣的方法有
  • 做網(wǎng)站掙錢么網(wǎng)站推廣是做什么的
  • 網(wǎng)站里的搜索怎么做免費(fèi)制作自己的網(wǎng)站
  • 西安本地十家做網(wǎng)站建設(shè)的公司seo網(wǎng)站排名的軟件
  • 醫(yī)學(xué)院英文網(wǎng)站建設(shè)方案廣州網(wǎng)絡(luò)推廣哪家好
  • 上海網(wǎng)站開發(fā)哪里有外鏈發(fā)布網(wǎng)站
  • 如何在vs做網(wǎng)站免費(fèi)線上培訓(xùn)平臺(tái)
  • 關(guān)于政府網(wǎng)站建設(shè)的幾點(diǎn)建議免費(fèi)個(gè)人網(wǎng)頁制作
  • 蕭山城區(qū)建設(shè)有限公司網(wǎng)站公司官網(wǎng)制作多少錢
  • 做網(wǎng)站有沒有受騙過免費(fèi)刷贊網(wǎng)站推廣免費(fèi)
  • 網(wǎng)站建設(shè)費(fèi)怎么做分錄seo案例視頻教程