中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

怎么做外貿(mào)推廣網(wǎng)站搜索關(guān)鍵詞優(yōu)化

怎么做外貿(mào)推廣,網(wǎng)站搜索關(guān)鍵詞優(yōu)化,php與H5做網(wǎng)站,微博網(wǎng)站認(rèn)證 備案名稱目錄 一、客戶端代碼實(shí)現(xiàn) 1.1、需求分析 1.2、具體實(shí)現(xiàn) 1)實(shí)現(xiàn) ConnectionFactory 2)實(shí)現(xiàn) Connection 3)實(shí)現(xiàn) Channel 二、編寫 Demo 2.1、實(shí)例 2.1、實(shí)例演示 一、客戶端代碼實(shí)現(xiàn) 1.1、需求分析 RabbitMQ 的客戶端設(shè)定&#xff…

目錄

一、客戶端代碼實(shí)現(xiàn)

1.1、需求分析

1.2、具體實(shí)現(xiàn)

1)實(shí)現(xiàn) ConnectionFactory

2)實(shí)現(xiàn) Connection

3)實(shí)現(xiàn) Channel

二、編寫 Demo?

2.1、實(shí)例?

2.1、實(shí)例演示


一、客戶端代碼實(shí)現(xiàn)


1.1、需求分析

RabbitMQ 的客戶端設(shè)定:一個(gè)客戶端可以有多個(gè)模塊,每個(gè)模塊都可以和 broker server 之間建立 “邏輯上的連接” (channel),這幾個(gè)模塊的channel 彼此之間是互相不影響的,同時(shí)這幾個(gè) channel 又復(fù)用的同一個(gè) TCP 連接,省去了頻繁 建立/銷毀?TCP 連接的開銷(三次握手、四次揮手......).

這里,我們也按照這樣的邏輯實(shí)現(xiàn) 消息隊(duì)列 的客戶端,主要涉及到以下三個(gè)核心類:

  1. ConnectionFactory:連接工廠,這個(gè)類持有服務(wù)器的地址,主要功能就是創(chuàng)建 Connection 對(duì)象.
  2. Connection:表示一個(gè) TCP連接,持有 Socket 對(duì)象,用來 寫入請(qǐng)求/讀取響應(yīng),管理多個(gè)Channel 對(duì)象.
  3. Channel:表示一個(gè)邏輯上的連接,需要提供一系列的方法,去和服務(wù)器提供的核心 API 對(duì)應(yīng)(客戶端提供的這些方法的內(nèi)部,就是寫入了一個(gè)特定的請(qǐng)求,然后等待服務(wù)器響應(yīng)).

1.2、具體實(shí)現(xiàn)

1)實(shí)現(xiàn) ConnectionFactory

主要用來創(chuàng)建 Connection 對(duì)象.

public class ConnectionFactory {//broker server 的 ip 地址private String host;//broker server 的端口號(hào)private int port;//    //訪問 broker server 的哪個(gè)虛擬主機(jī)
//    //這里暫時(shí)先不涉及
//    private String virtualHostName;
//    private String username;
//    private String password;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}
}

2)實(shí)現(xiàn) Connection

屬性如下

    private Socket socket;//一個(gè) socket 連接需要管理多個(gè) channelprivate ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;// DataXXX 主要用來 讀取/寫入 特定格式數(shù)據(jù)(例如 readInt())private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//用來處理 0xc 的回調(diào),這里開銷可能會(huì)很大,不希望把 Connection 阻塞住,因此使用 線程池 來處理private ExecutorService callbackPool;

構(gòu)造如下

這里不光需要初始化屬性,還需要?jiǎng)?chuàng)建一個(gè)掃描線程,由這個(gè)線程負(fù)責(zé)不停的從 socket 中讀取響應(yīng)數(shù)據(jù),把這個(gè)響應(yīng)數(shù)據(jù)再交給對(duì)應(yīng)的 channel 負(fù)責(zé)處理

    public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//創(chuàng)建一個(gè)掃描線程,由這個(gè)線程負(fù)責(zé)不停的從 socket 中讀取響應(yīng)數(shù)據(jù),把這個(gè)響應(yīng)數(shù)據(jù)再交給對(duì)應(yīng)的 channel 負(fù)責(zé)處理Thread t = new Thread(() -> {try {while(!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {//連接正常斷開的,此時(shí)這個(gè)異??梢院雎許ystem.out.println("[Connection] 連接正常斷開!");} catch(IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 連接異常斷開!");e.printStackTrace();}});t.start();}

釋放 Connection 相關(guān)資源

    public void close() {try {callbackPool.shutdown();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

使用這個(gè)方法來區(qū)別,當(dāng)前的響應(yīng)是一個(gè)針對(duì)控制請(qǐng)求的響應(yīng),還是服務(wù)器推送過來的消息.

如果是服務(wù)器推送過來的消息,就響應(yīng)表明是 0xc,也就是一個(gè)回調(diào),通過線程池來進(jìn)行處理;

如果只是一個(gè)普通的響應(yīng),就把這個(gè)結(jié)果放到 channel 的 哈希表中(隨后 channel 會(huì)喚醒所有阻塞等待響應(yīng)的線程,去 map 中拿數(shù)據(jù)).

    public void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if(response.getType() == 0xc) {//服務(wù)器推送過來的消息數(shù)據(jù)SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根據(jù) channelId 找到對(duì)應(yīng)的 channel 對(duì)象Channel channel = channelMap.get(subScribeReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 該消息對(duì)應(yīng)的 channel 再客戶端中不存在!channelId=" + channel.getChannelId());}//執(zhí)行該 channel 對(duì)象內(nèi)部的回調(diào)(這里的開銷未知,有可能很大,同時(shí)不希望把這里阻塞住,所以使用線程池來執(zhí)行)callbackPool.submit(() -> {try {channel.getConsumer().handlerDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch(MqException | IOException e) {e.printStackTrace();}});} else {//當(dāng)前響應(yīng)是針對(duì)剛才的控制請(qǐng)求的響應(yīng)BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把這個(gè)結(jié)果放到 channel 的 哈希表中Channel channel = channelMap.get(basicReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 該消息對(duì)應(yīng)的 channel 在客戶端中不存在!channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}

發(fā)送請(qǐng)求和讀取響應(yīng)

    /*** 發(fā)送請(qǐng)求* @param request* @throws IOException*/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 發(fā)送請(qǐng)求!type=" + request.getType() + ", length=" + request.getLength());}/*** 讀取響應(yīng)*/public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if(n != response.getLength()) {throw new IOException("讀取的響應(yīng)格式不完整! n=" + n + ", responseLen=" + response.getLength());}response.setPayload(payload);System.out.println("[Connection] 收到響應(yīng)!type=" + response.getType() + ", length=" + response.getLength());return response;}

在 Connection 中提供創(chuàng)建 Channel 的方法

    public Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);//放到 Connection 管理的 channel 的 Map 集合中channelMap.put(channelId, channel);//同時(shí)也需要把 “創(chuàng)建channel” 這個(gè)消息告訴服務(wù)器boolean ok = channel.createChannel();if(!ok) {//如果創(chuàng)建失敗,就說明這次創(chuàng)建 channel 操作不順利//把剛才加入 hash 表的鍵值對(duì)再刪了channelMap.remove(channelId);return null;}return channel;}

Ps:代碼中使用了很多次 UUID ,這里我們和之前一樣,使用加前綴的方式來進(jìn)行區(qū)分.

3)實(shí)現(xiàn) Channel

屬性和構(gòu)造如下

    private String channelId;// 當(dāng)前這個(gè) channel 是屬于哪一個(gè)連接private Connection connection;//用來存儲(chǔ)后續(xù)客戶端收到的服務(wù)器響應(yīng),已經(jīng)辨別是哪個(gè)響應(yīng)(要對(duì)的上號(hào)) key 是 ridprivate ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();//如果當(dāng)前 Channel 訂閱了某個(gè)隊(duì)列,就需要記錄對(duì)應(yīng)的回調(diào)是什么,當(dāng)該隊(duì)列消息返回回來的時(shí)候,調(diào)用回調(diào)//此處約定一個(gè) Channel 只能有一個(gè)回調(diào)private Consumer consumer;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}public Connection getConnection() {return connection;}public void setConnection(Connection connection) {this.connection = connection;}public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {return basicReturnsMap;}public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {this.basicReturnsMap = basicReturnsMap;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer = consumer;

實(shí)現(xiàn) 0x1 創(chuàng)建 channel

主要就是構(gòu)造構(gòu)造出 request,然后發(fā)送請(qǐng)求到 BrokerServer 服務(wù)器,阻塞等待服務(wù)器響應(yīng).

    /*** 0x1* 和服務(wù)器進(jìn)行交互,告訴服務(wù)器,此處客戶端已經(jīng)創(chuàng)建了新的 channel 了* @return*/public boolean createChannel() throws IOException {//構(gòu)造 payloadBasicArguments arguments = new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(arguments);//發(fā)送請(qǐng)求Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);//等待服務(wù)器響應(yīng)BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}/*** 生成 rid* @return*/public String generateRid() {return "R-" + UUID.randomUUID().toString();}/*** 阻塞等待服務(wù)器響應(yīng)* @param rid* @return*/private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while((basicReturns = basicReturnsMap.get(rid)) == null) {//查詢結(jié)果為空,就說明咱們?nèi)ゲ锁B驛站要取的包裹還沒到//此時(shí)就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}basicReturnsMap.remove(rid);return basicReturns;}/*** 由 Connection 中的方法調(diào)用,區(qū)分為普通響應(yīng)之后觸發(fā)* 將響應(yīng)放回到 channel 管理的 map 中,并喚醒所有線程* @param basicReturns*/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {//當(dāng)前也不知道有多少線程再等待上述的這個(gè)響應(yīng)//因此就把所有等待的線程喚醒notifyAll();}}

Ps:其他的 請(qǐng)求操作也和 0x1 的方式幾乎一樣,這里不一一展示了,主要說一下 0xa

0xa 消費(fèi)者訂閱隊(duì)列消息,這里要先設(shè)置好回調(diào)到屬性中,方便 Connection 通過這個(gè)屬性來 處理回調(diào)

值得注意的一點(diǎn), 我們約定 channelId 就是 consumerTag

    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws IOException, MqException {//先設(shè)置回調(diào)if(this.consumer != null) {throw new MqException("該 channel 已經(jīng)設(shè)置過消費(fèi)消息回調(diào)了,不能重復(fù)!");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 注意:此處的 consumerTag 使用 channelId 來表示basicConsumeArguments.setQueueName(queueName);basicConsumeArguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.isOk();}

二、編寫 Demo?


2.1、實(shí)例?

到了這里基本就實(shí)現(xiàn)完成了一個(gè) 跨主機(jī)/服務(wù)器 之間的生產(chǎn)者消費(fèi)者模型了(功能上可以滿足日常開發(fā)對(duì)消息隊(duì)列的使用),但是還具有很強(qiáng)的擴(kuò)展性,可以繼續(xù)參考 RabbitMQ,如果有想法的,或者是遇到不會(huì)的問題,可以私信我~

以下我來我來編寫一個(gè) demo,模擬 跨主機(jī)/服務(wù)器 之間的生產(chǎn)者消費(fèi)者模型(這里為了方便,就在本機(jī)演示).

首先再 spring boot 項(xiàng)目的啟動(dòng)類中 創(chuàng)建 BrokerServer ,綁定端口號(hào),然后啟動(dòng)

@SpringBootApplication
public class RabbitmqProjectApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(RabbitmqProjectApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}

編寫消費(fèi)者

public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {//建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)和隊(duì)列(這里和生產(chǎn)者創(chuàng)建交換機(jī)和隊(duì)列不沖突,誰先啟動(dòng),就按照誰的創(chuàng)建,即使已經(jīng)存在交換機(jī)和隊(duì)列,再創(chuàng)建也不會(huì)有什么副作用)channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("demoQueue", true, false, false, null);//消費(fèi)者消費(fèi)消息channel.basicConsume("demoQueue", true, new Consumer() {@Overridepublic void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("開銷消費(fèi)");System.out.println("consumerTag=" + consumerTag);System.out.println("body=" + new String(body));System.out.println("消費(fèi)完畢");}});//由于消費(fèi)者不知道生產(chǎn)者要生產(chǎn)多少,就在這里通過循環(huán)模擬一直等待while(true) {Thread.sleep(500);}}}

編寫生產(chǎn)者

public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {//建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)和隊(duì)列(這里和消費(fèi)者創(chuàng)建交換機(jī)和隊(duì)列不沖突,誰先啟動(dòng),就按照誰的創(chuàng)建,即使已經(jīng)存在交換機(jī)和隊(duì)列,再創(chuàng)建也不會(huì)有什么副作用)channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("demoQueue", true, false, false, null);//生產(chǎn)消息byte[] body1 = "Im cyk1 !".getBytes();channel.basicPublish("demoExchange", "demoQueue", null, body1);Thread.sleep(500);//關(guān)閉連接channel.close();connection.close();}}

2.1、實(shí)例演示

啟動(dòng) spring boot 項(xiàng)目(啟動(dòng) BrokerServer)

運(yùn)行消費(fèi)者(消費(fèi)者和生產(chǎn)者誰先后運(yùn)行都可以)

?

運(yùn)行生產(chǎn)者

?

?

http://m.risenshineclean.com/news/61892.html

相關(guān)文章:

  • 怎樣做能直接上傳微信的視頻網(wǎng)站莆田網(wǎng)站建設(shè)優(yōu)化
  • 做社交網(wǎng)站開發(fā)怎么建網(wǎng)站免費(fèi)的
  • 網(wǎng)站開發(fā)代做外貿(mào)網(wǎng)站制作推廣
  • 沒網(wǎng)站域名可以做備案嗎百度熱度
  • 塘沽網(wǎng)站建設(shè)網(wǎng)站建設(shè)方案內(nèi)容
  • 域名怎么解析到網(wǎng)站網(wǎng)絡(luò)營銷策略的制定
  • 新民正規(guī)網(wǎng)站建設(shè)價(jià)格咨詢高級(jí)seo是什么職位
  • 高端網(wǎng)站設(shè)計(jì)建站找個(gè)免費(fèi)網(wǎng)站這么難嗎
  • 青海省建設(shè)網(wǎng)站多少錢今日頭條(官方版本)
  • b2b電子商務(wù)平臺(tái)選擇有哪些seo網(wǎng)站平臺(tái)
  • 格爾木市住房和城鄉(xiāng)建設(shè)局網(wǎng)站做專業(yè)搜索引擎優(yōu)化
  • 網(wǎng)站外鏈帶nofollow是什么意思網(wǎng)站快速優(yōu)化排名官網(wǎng)
  • 網(wǎng)絡(luò)技術(shù)與網(wǎng)站建設(shè)seo網(wǎng)站優(yōu)化課程
  • 東南網(wǎng)架公司哈爾濱seo和網(wǎng)絡(luò)推廣
  • 貴港網(wǎng)站設(shè)計(jì)免費(fèi)發(fā)布網(wǎng)站seo外鏈
  • 外貿(mào)添加外鏈網(wǎng)站建站網(wǎng)站
  • 有ip怎么用自己的主機(jī)做網(wǎng)站灰色項(xiàng)目推廣渠道
  • 引用網(wǎng)站的內(nèi)容如何做注釋排行榜軟件
  • 寧波市住房與城鄉(xiāng)建設(shè)部網(wǎng)站百度推廣一天燒幾千
  • 專業(yè)攝影網(wǎng)站杭州網(wǎng)站優(yōu)化服務(wù)
  • 高端服裝產(chǎn)品網(wǎng)站建設(shè)seo建站平臺(tái)哪家好
  • 網(wǎng)站策劃步驟網(wǎng)站交易
  • 個(gè)人網(wǎng)站設(shè)計(jì)成首頁網(wǎng)絡(luò)營銷的實(shí)現(xiàn)方式
  • 網(wǎng)站建設(shè) 事業(yè)單位 安全外貿(mào)網(wǎng)站平臺(tái)都有哪些 免費(fèi)的
  • JAVA網(wǎng)站開發(fā)ssm朝陽區(qū)seo搜索引擎優(yōu)化介紹
  • 網(wǎng)站建設(shè)公司哪好建站優(yōu)化推廣
  • 客戶做網(wǎng)站需要提供什么百度收錄提交申請(qǐng)網(wǎng)站
  • 網(wǎng)站后臺(tái) ftp網(wǎng)站建設(shè)seo優(yōu)化培訓(xùn)
  • 網(wǎng)站開發(fā)下載現(xiàn)在最火的推廣平臺(tái)
  • 網(wǎng)站開發(fā)如何入賬sem優(yōu)化師