電商網(wǎng)站設(shè)計的原則網(wǎng)絡(luò)營銷模式
目錄
2、進程池
1)理解進程池
?2)進程池的實現(xiàn)
整體框架:
a. 加載任務(wù)
b. 先描述,再組織
I. 先描述
II. 再組織
c. 創(chuàng)建信道和子進程
d. 通過channel控制子進程
e. 回收管道和子進程
問題1:
解答1:
問題2:
解答2:
f. 將進程池本身和任務(wù)文件本身進行解耦
3)完整代碼
processpool.cc:
Task.hpp:
Makefile:
命令行中的 | ,就是匿名管道
它們的父進程都是bash
2、進程池
1)理解進程池
a. 可以將任務(wù)寫入管道來給到子進程,從而可以提前創(chuàng)建子進程想讓哪個子進程完成任務(wù),我就讓寫入到哪個子進程相對的管道中
b. 管道里面沒有數(shù)據(jù),worker進程就在阻塞等待,等待務(wù)的到來!!
c. master向哪一個管道進行寫入,就是喚醒哪一個子進程來處理任務(wù)
d. 均衡的向后端子進程劃分任務(wù),稱之為負載均衡父進程要進行后端任務(wù)的負載均衡
父進程直接向管道里寫入固定長度的四字節(jié)(int)數(shù)組下標(biāo)(任務(wù)碼)
函數(shù)指針數(shù)組中元素分別指向不同的任務(wù),以便master控制worker完成指定工作
?2)進程池的實現(xiàn)
整體框架:
// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加載任務(wù)std::vector<Channel> channels; // 將管道組織起來// 1.創(chuàng)建信道和子進程CreateChannelAndSub(num, &channels);// 2.通過channel控制子進程CtrlProcess(channels, 10);// 3.回收管道和子進程 a.關(guān)閉所有的寫端 b.回收子進程ClearUpChannel(channels);return 0;
}
a. 加載任務(wù)
我這里用的是打印的方式來模擬任務(wù)的分配,通過打印從而了解子進程執(zhí)行任務(wù)的情況,通過種下隨機數(shù)種子,產(chǎn)生隨機數(shù),進而隨機的向子進程分配任務(wù),work即為子進程需要做的工作
Task.hpp:
#pragma once#include <iostream> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函數(shù)指針void Print() {std::cout << "I am print task" << std::endl; } void DownLoad() {std::cout << "I am a download task" << std::endl; } void Flush() {std::cout << "I am a flush task" << std::endl; }task_t tasks[TaskNum];void LoadTask() {srand(time(nullptr) ^ getpid() ^ 177); // 種一個隨機種子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush; }void ExcuteTask(int number) {if(number < 0 || number > 2) return;tasks[number](); }int SelectTask() {return rand() % TaskNum; }void work(int rfd) {while (true){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}} }
// 命令行規(guī)范 --> ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加載任務(wù)return 0;
}
b. 先描述,再組織
I. 先描述
需要控制的信道(即發(fā)送端wfd)數(shù)量多且繁瑣,需要管理起來從而方便控制給子進程發(fā)送任務(wù)
class Channel { private:int _wfd;int _subprocessid;std::string _name; };
在信道中,我們需要知道發(fā)送的文件描述符wfd,還有知道子進程的pid,以及信道的命名(用來區(qū)分信道)
II. 再組織
std::vector<Channel> channels;
我們通過用一個vector數(shù)組將所有的Channel存儲起來,從而實現(xiàn)對它們的增刪查改,以方便管理
c. 創(chuàng)建信道和子進程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{for (int i = 0; i < num; ++i){// 1.創(chuàng)建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0) exit(1); // 創(chuàng)建管道失敗// 2.創(chuàng)建子進程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);work();close(pipefd[0]);exit(0);}// 3.構(gòu)建一個名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子進程的pid b.父進程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}
用命令行參數(shù)的方式傳入得到的argv[1]即為輸入命令需要的子進程和管道個數(shù)
通過for循環(huán),創(chuàng)建 num個 pipe管道以及子進程,當(dāng)創(chuàng)建完子進程時,需要關(guān)閉掉不需要的文件描述符(即wfd -- pipefd[1])(當(dāng)然,父進程也需要關(guān)閉不需要的fd -- rfd),在執(zhí)行完work(子進程的工作)之后,關(guān)閉掉rfd(即工作完成了,關(guān)閉其管道),然后exit(0)退出進程,等待父進程回收
d. 通過channel控制子進程
// 輪詢方案 -- 負載均衡
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}// 發(fā)送相應(yīng)的任務(wù)碼到對應(yīng)管道內(nèi)
void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 選擇一個任務(wù)int taskcommand = SelectTask();// b. 選擇一個信道和進程int channel_index = NextChannel(channels.size());// c. 發(fā)送任務(wù)SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}
向其發(fā)送任務(wù)之前,我們需要先選擇一個任務(wù),通過隨機種子隨機數(shù)的方式,隨機選擇我們的一個任務(wù),拿到其任務(wù)碼(即指針數(shù)組下標(biāo)),然后選擇相應(yīng)的信道和進程(信道和進程一體的),從而向管道發(fā)送任務(wù)碼給子進程
e. 回收管道和子進程
void ClearUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}
我們先將所有的信道關(guān)閉,然后在逐個將子進程等待回收
問題1:
那為什么不能邊關(guān)閉信道邊回收呢??
即
解答1:
在我們創(chuàng)建子進程的過程中,由于父子進程之間的繼承,從而導(dǎo)致子進程會擁有父進程的文件描述符內(nèi)容(即指向同一地方),如果我們邊關(guān)閉邊回收的話,如上圖所示,當(dāng)我們關(guān)閉父進程的第一個管道的wfd時,這時候第一個管道的讀端的引用計數(shù)并未清0,因為子進程2它繼承了父進程指向第一個管道的wfd(讀端),從而使得讀端阻塞,進程不退出,然后wait子進程的時候就會阻塞
在work結(jié)束后,才會到下一步close和exit退出子進程;
work結(jié)束需要的條件是 n == 0,即讀端返回值為0,即
因此上述那種邊關(guān)閉信道,邊wait子進程的方法會阻塞
問題2:
為什么這種方法又能成功回收呢??
解答2:
因為當(dāng)我們將所有信道關(guān)閉時,關(guān)閉到最后一個子進程對應(yīng)的管道的wfd的時候,該管道的讀端的引用計數(shù)就會為0,從而讀端讀到0,該子進程退出,隨子進程退出就會使得該子進程指向的前面管道的讀端回收,就不會造成前面那種情況
f. 將進程池本身和任務(wù)文件本身進行解耦
用回調(diào)函數(shù)可以很好的改善代碼的耦合性
通過文件描述符重定向 dup2,將標(biāo)準輸入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回調(diào)task()函數(shù)
// 重定向
這樣做可以徹底的讓我們的子進程執(zhí)行對應(yīng)的work時,再也不需要知道有什么管道的讀端
(不用管從哪里接收信息,直接認為從標(biāo)準輸入拿到信息即可)
--- 將管道的邏輯和執(zhí)行任務(wù)的邏輯進一步進行解耦
// task_t task : 回調(diào)函數(shù)
有了它,我們進程池本身的代碼和我們?nèi)蝿?wù)本身兩個文件就徹底解耦了
--- 即既不關(guān)心從哪個文件描述符,直接默認從0里面去讀,也不關(guān)心將來誰調(diào)它,因為子進程會自動回調(diào)它
3)完整代碼
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel() { close(_wfd); }void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;int _subprocessid;std::string _name;
};// 形參和命名規(guī)范
// const & : 輸入型參數(shù)
// & : 輸入輸出型參數(shù)
// * : 輸出型參數(shù)// task_t task : 回調(diào)函數(shù)
// 有了它,我們進程池本身的代碼和我們?nèi)蝿?wù)本身兩個文件就徹底解耦了
// --- 即既不關(guān)心從哪個文件描述符,直接默認從0里面去讀,也不關(guān)心將來誰調(diào)它,因為子進程會自動回調(diào)它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < num; ++i){// 1.創(chuàng)建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1); // 創(chuàng)建管道失敗// 2.創(chuàng)建子進程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);dup2(pipefd[0], 0);task();close(pipefd[0]);exit(0);}// 3.構(gòu)建一個名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子進程的pid b.父進程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum) // 輪詢方案 -- 負載均衡
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 選擇一個任務(wù)int taskcommand = SelectTask();// b. 選擇一個信道和進程int channel_index = NextChannel(channels.size());// c. 發(fā)送任務(wù)SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()<< " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}void ClearUpChannel(std::vector<Channel> &channels)
{// for (auto &channel : channels)// {// channel.CloseChannel();// channel.Wait();// }// int num = channels.size() -1;// while(num >= 0)// {// channels[num].CloseChannel();// channels[num--].Wait();// }for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加載任務(wù)std::vector<Channel> channels; // 將管道組織起來// 1.創(chuàng)建信道和子進程CreateChannelAndSub(num, &channels, work);// 2.通過channel控制子進程CtrlProcess(channels, 10);// 3.回收管道和子進程 a.關(guān)閉所有的寫端 b.回收子進程ClearUpChannel(channels);// // for test// for(auto &channel : channels)// {// std::cout << "====================" << std::endl;// std::cout << channel.GetName() << std::endl;// std::cout << channel.GetWfd() << std::endl;// std::cout << channel.GetProcessId() << std::endl;// }// sleep(100);return 0;
}
Task.hpp:
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函數(shù)指針void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 177); // 種一個隨機種子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is: " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process: " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }// 這樣做可以徹底的讓我們的子進程執(zhí)行對應(yīng)的work時,再也不需要知道有什么管道的讀端
// (不用管從哪里接收信息,直接認為從標(biāo)準輸入拿到信息即可)
// 將管道的邏輯和執(zhí)行任務(wù)的邏輯進一步進行解耦
void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}}
}
Makefile:
processpool:processpool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool