- 論壇徽章:
- 0
|
本帖最后由 oscarzhou 于 2011-04-23 12:33 編輯
設(shè)計(jì)的一個(gè)server,開(kāi)始是單進(jìn)程模式的,2w/s的訪問(wèn)量沒(méi)有問(wèn)題。但為了充分利用多cpu,想把它改成多進(jìn)程方式。
改動(dòng)不大,主要就是父進(jìn)程先創(chuàng)建listen對(duì)象,監(jiān)聽(tīng)端口,然后fork子進(jìn)程,每個(gè)子進(jìn)程創(chuàng)建自己的epoll,監(jiān)聽(tīng)listen_fd、accept_fd,以及跟后臺(tái)各server交互的fd。
問(wèn)題來(lái)了:比如我們起4個(gè)子進(jìn)程(每個(gè)cpu一個(gè)),然后用10個(gè)線程客戶端去壓力測(cè)試,在開(kāi)始測(cè)試的時(shí)候,客戶端總會(huì)有少量的接收超時(shí),沒(méi)有收到應(yīng)答包。但是過(guò)一段事件之后,就會(huì)跑的比較平穩(wěn),沒(méi)有超時(shí),壓力也很固定。
調(diào)試的時(shí)候,關(guān)閉了到后臺(tái)各Server的交互,收到包之后,本地處理就回包,也會(huì)有少量的超時(shí)。
不知道這是怎么回事?靜待高手解答。不要告訴我換多線程,或者父進(jìn)程epoll監(jiān)聽(tīng)listen_fd的實(shí)現(xiàn),哥懂那個(gè),只是想明白這種設(shè)計(jì)范式下的問(wèn)題可能是什么原因。
另外,據(jù)說(shuō)linux 2.6內(nèi)核版本已經(jīng)解決了此設(shè)計(jì)模式下的驚群?jiǎn)栴},但是否會(huì)有消息亂序,即A子進(jìn)程的fd加入到了B子進(jìn)程的epoll事件里去。
簡(jiǎn)化的實(shí)例代碼如下:
- /* 單進(jìn)程可以監(jiān)聽(tīng)本地多端口,每個(gè)端口創(chuàng)建一個(gè)對(duì)象,用vector保存起來(lái),用于傳遞到子進(jìn)程里,每個(gè)子進(jìn)程去創(chuàng)建epoll監(jiān)聽(tīng)listen_fd */
- vector<CPollerObject*> g_listen_obj;
- int InitGlobalInstance(){
- pid_t pid = getpid ();
- int iRet;
-
- /* 啟動(dòng)epoll */
- iRet = CPollerUnit::Ins()->Create(g_conf.m_epoll._maxfd, g_conf.m_epoll._timeout);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create Epoll Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CPollerUnit Create Succ: %p", CPollerUnit::Ins());
-
- for( vector<CPollerObject*>::iterator it=g_listen_obj.begin(); it!=g_listen_obj.end(); it++){
- (*it)->EnableInput();
- (*it)->AttachPoller();
- }
-
- /* 初始化內(nèi)存池管理 */
- iRet = CMemPool::Ins()->Create(g_conf.m_session._bufSize);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create Epoll Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CMemPool Create Succ: %p", CMemPool::Ins());
- /* 初始化連接池 */
- iRet = CTcpPoll::Ins()->Init(g_conf.m_session._max, g_conf.m_errReport._failCount, g_conf.m_errReport._timeInterval);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create CTcpPoll Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CTcpPoll Create Succ: %p", CTcpPoll::Ins());
- /* 初始化統(tǒng)計(jì)對(duì)象。 */
- char pathWithPid[512] = {'\0'};
- snprintf(pathWithPid, sizeof(pathWithPid)-1, "%s_%d", g_conf.m_stat._prefix, pid);
- iRet = CProxyStat::Instance().Init(g_conf.m_stat._path, pathWithPid);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create CProxyStat Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CProxyStat Create Succ: %p", &(CProxyStat::Instance()));
- return 0;
- }
- int MainThread()
- {
- InitGlobalInstance();
- time_t tNow = time(NULL);
- while(true){
- _g_child_flag = 1;
- /* 處理連接任務(wù) */
- CPollerUnit::Ins()->ProcessAllEvents();
-
- /* 超時(shí)檢查*/
- CSession::Ins()->CheckTimeOut();
- /* 清理空閑的tcp連接。 */
- CTcpPoll::Ins()->CheckIdle(g_conf.m_session._idle, g_conf.m_session._timeInterval, g_conf.m_session._emptyRatio);
- /* 檢查是否到了統(tǒng)計(jì)間隔 */
- if( int(time(NULL) - tNow) < g_conf.m_stat._interval) continue;
- CProxyStat::Instance().ProcessStat();
- tNow = time(NULL);
- }
-
- return -1;
- };
- int fork_process_group()
- {
- //process group配置
- int cpu_process_num = sysconf(_SC_NPROCESSORS_CONF);
- int process_group_num = g_conf.m_iProcessNum;
-
- /* 不指定進(jìn)程個(gè)數(shù),或者指定進(jìn)程數(shù)比cpu個(gè)數(shù)還多,修改為cpu的個(gè)數(shù)。 */
- if( 0 == process_group_num || process_group_num > cpu_process_num )
- process_group_num = cpu_process_num;
- ERROR_LOG( "cpu num: %d, config process num: %d, real process num: %d", cpu_process_num, g_conf.m_iProcessNum, process_group_num);
- int pid;
- for(int i = 0; i < process_group_num ; i++)
- {
- pid = fork();
- switch(pid)
- {
- case -1:
- ERROR_LOG("fork error!");
- return -1;
- case 0:
- SetRunCpu( i );
- MainThread();
- return 0;
- default:
- g_processcpu.insert(make_pair(pid, i));
- };
- }
- return 0;
- }
- int main(int argc,char** argv)
- {
- if( argc <2 ){
- fprintf(stderr, "usage:%s etc\n", argv[0]);
- return -1;
- }
- /* 顯示系統(tǒng)說(shuō)明。 */
- if( 0==strcasecmp(argv[1], "-v") ){
- PrintVersion();
- return 0;
- }
- PrintVersion();
- /* 初始化,讀取配置文件。 */
- int iRet = g_conf.Init(argv[1]);
- if( E_OK != iRet ){
- fprintf(stderr, "Init Conf Error!:%d\n", iRet);
- return E_FAIL;
- }
- /* 初始化log */
- log_init(g_conf.m_log._path, g_conf.m_log._level, g_conf.m_log._size, g_conf.m_log._prefix);
- /* 設(shè)置信號(hào) */
- SetUser1Handle();
- /* 綁定cpu */
- //if ( g_conf.m_iCpuBind>= 0 ) SetRunCpu(g_conf.m_iCpuBind);
- /* 監(jiān)聽(tīng)端口 */
- if ( CreateClientSocket() < 0 )
- {
- fprintf(stderr, "CreateClientSocket\n");
- return E_FAIL;
- }
- DEBUG_LOG("CreateClientSocket Succ!");
- /* fork進(jìn)程,設(shè)置為守護(hù)進(jìn)程。 */
- daemon_init();
- fork_process_group();
- #ifndef DEBUG
- pid_t pid;
- for(;;)
- {
- pid = waitpid(-1,NULL,0);
- if ( pid <= 0 )
- {
- ERROR_LOG("waitpid error:%s\n", strerror(errno));
- sleep(1);
- continue;
- }
- fflush(NULL);
-
- ERROR_LOG("WorkMain[%d] restart...................................\n", pid);
- usleep(100000);
-
- int cpunum = g_processcpu.find(pid) == g_processcpu.end()? 0 : g_processcpu[pid];
- int newpid = fork();
- switch(newpid){
- case -1:
- ERROR_LOG("fork error\n");
- break;
- case 0:
- SetRunCpu(cpunum);
- MainThread();
- break;
- default:
- g_processcpu[newpid] = cpunum;
- break;
- }
- }
- #endif
-
- return 0;
- }
復(fù)制代碼 |
|