- 論壇徽章:
- 0
|
本帖最后由 redor 于 2011-06-04 11:47 編輯
目前這個(gè)任務(wù)調(diào)度只負(fù)責(zé)任務(wù)的數(shù)據(jù)ID或者數(shù)據(jù)ID打包,不負(fù)責(zé)存儲(chǔ)數(shù)據(jù)本身,需要單獨(dú)
的數(shù)據(jù)存儲(chǔ),任務(wù)計(jì)算節(jié)點(diǎn)需要自行設(shè)計(jì)數(shù)據(jù)存儲(chǔ)部分,這個(gè)程序包會(huì)提供一個(gè)獲取任務(wù)
和提交任務(wù)的客戶端接口libmtask,mtask.h;具體的實(shí)現(xiàn)參考btask.c 這個(gè)就是一個(gè)
benchmark。任務(wù)調(diào)度服務(wù)啟動(dòng)/usr/sbin/qtaskd -d -c /etc/qtaskd.ini
然后通過(guò)瀏覽器打開(kāi)http://serverIP:2080/ 頁(yè)面上可以添加任務(wù),這些任務(wù)有編號(hào),會(huì)顯示任務(wù)目前的完成情況。
btask使用方法就是對(duì)照自己配置好的任務(wù),任務(wù)計(jì)算節(jié)點(diǎn)可以有三種類(lèi)型:
1. 只提交任務(wù),比如數(shù)據(jù)源頭,這個(gè)節(jié)點(diǎn)不獲取任務(wù)(./btask -h IP -p 2066 -m 0 -q 1 -d);
2. 獲取任務(wù)計(jì)算完成以后提交一個(gè)完成通知,另外同時(shí)提交給下一個(gè)任務(wù)(./btask -h IP -p 2066 -m 1 -q 2 -d);
3. 結(jié)束計(jì)算節(jié)點(diǎn),只獲取任務(wù)完成之后提交完成通知,不需要把任務(wù)繼續(xù)提交給一個(gè)任務(wù)(./btask -h IP -p 2066 -m 2 -q 0 -d)。
abcd.jpg (66.38 KB, 下載次數(shù): 0)
下載附件
2011-06-02 15:17 上傳
下載:
SRPM打包:http://libibase.googlecode.com/files/srpms-20110603183000.tar.gz
RPM打包: http://libibase.googlecode.com/files/rpms-20110603183000.tar.gz
源碼: http://libibase.googlecode.com/files/qmtask-0.0.5.tar.gz
客戶端API
- /* set message task */
- int mtask_set(MTASK *mtask, char *ip, int port, int mtaskid, int qtaskid);
- /* connect to qtask
- * -1 mtask is NULL
- * -2 socket() failed
- * -3 connect() failed
- * */
- int mtask_connect(MTASK *mtask);
- /* get new task count
- * -1 mtask is NULL
- * -2 mtask->fd <= 0 and mtask_connect failed
- * -3 write() task header failed
- * -4 write() task list[] failed
- * -5 read task header failed
- * -6 malloc failed
- * -7 read task list[] failed
- * ret >= 0 mhead.packetid
- * */
- int mtask_commit(MTASK *mtask, int flag, char *packet, int packet_len);
- /* push packet
- * return value
- * -1 mtask is NULL
- * -2 connection is bad
- * -3 Invalid packet data
- * -4 write() header failed
- * -5 write() packet failed
- * -6 recv() header failed
- * */
- int mtask_push(MTASK *mtask, int flag, char *packet, int packet_len);
- /* pop packet
- * return value
- * -1 mtask is NULL
- * -2 connection is bad
- * -3 Invalid commitid
- * -4 write() header failed
- * -5 recv() header failed
- * -6 malloc() for packet failed
- * -7 recv() packet failed
- */
- int mtask_pop(MTASK *mtask);
- /* over packet
- * return value
- * -1 mtask is NULL
- * -2 connection is bad
- * -3 Invalid packetid and commitid
- * -4 write() header failed
- * -5 recv() header failed
- * */
- int mtask_finish(MTASK *mtask, int flag);
- /* close message task */
- void mtask_close(MTASK *mtask);
復(fù)制代碼 寫(xiě)了一個(gè)調(diào)用的例子,不包括數(shù)據(jù)存取部分,演示了ID包的任務(wù)提交, 任務(wù)獲取, 任務(wù)完成, 還在弄具體的數(shù)據(jù)計(jì)算系統(tǒng),弄完給大家分享。
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <errno.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <sys/types.h>
- #include "mtask.h"
- int main(int argc, char **argv)
- {
- int i = 0, x = 0, mid = 0, qid = 0, port = 0, flag = 0,
- isdaemon = 0, isout = 0, len = 0, packetid = 0;
- char *ip = NULL, *packet = NULL, block[MTASK_PACKET_MAX * sizeof(int64_t)], ch = 0;
- int64_t old = 0, *list = NULL;
- MTASK mtask = {0};
- pid_t pid = 0;
- /* get configure file */
- while((ch = getopt(argc, argv, "h:p:m:q:d")) != -1)
- {
- switch(ch)
- {
- case 'h':
- ip = optarg;
- break;
- case 'p':
- port = atoi(optarg);
- break;
- case 'm':
- mid = atoi(optarg);
- break;
- case 'q':
- qid = atoi(optarg);
- break;
- case 'o':
- isout = 1;
- break;
- case 'd':
- isdaemon = 1;
- break;
- default:
- break;
- }
- }
- if(ip == NULL || port <= 0 || (qid == 0 && mid == 0))
- {
- fprintf(stderr, "Usage:%s -h host -p port -m commitid -q queueid -o output -d working as daemon\n", argv[0]);
- _exit(-1);
- }
- /* daemon */
- if(isdaemon)
- {
- pid = fork();
- switch (pid) {
- case -1:
- perror("fork()");
- exit(EXIT_FAILURE);
- break;
- case 0: /* child process */
- if(setsid() == -1)
- exit(EXIT_FAILURE);
- break;
- default:/* parent */
- _exit(EXIT_SUCCESS);
- break;
- }
- }
- if(mtask_set(&mtask, ip, port, mid, qid) == 0
- && mtask_connect(&mtask) == 0)
- {
- list = (int64_t *)block;
- if(mid <= 0 && qid > 0)
- {
- do
- {
- old = random();
- flag = 0;
- if((old%33) == 0) flag = MTASK_TO_QHEAD;
- x = 0;
- while(x < MTASK_PACKET_MAX)
- {
- list[x++] = (int64_t)random();
- }
- if((packetid = mtask_push(&mtask, flag, block,
- sizeof(int64_t) * MTASK_PACKET_MAX)) >= 0)
- {
- if(isout)fprintf(stdout, "1:{%d:{packetid:%d}}\n", i, packetid);
- ++i;
- }
- else
- {
- sleep(1);
- }
- }while(1);
- }
- else
- {
- /*
- packetid = 0;
- do
- {
- old = random();
- flag = 0;
- if((old%33) == 0) flag = MTASK_TO_QHEAD;
- if((packetid = mtask_commit(&mtask, flag, NULL, 0)) > 0)
- {
- packet = mtask.packet;
- len = mtask.length;
- if(isout)fprintf(stdout, "2:{%d:{packetid:%d length:%d}}\n", i, packetid, len);
- ++i;
- }
- else
- {
- packet = NULL;
- len = 0;
- sleep(1);
- }
- }while(1);
- */
- packetid = 0;
- do
- {
- old = random();
- flag = 0;
- if((old%33) == 0) flag = MTASK_TO_QHEAD;
- if((packetid = mtask_pop(&mtask)) > 0)
- {
- packet = mtask.packet;
- len = mtask.length;
- if(isout)fprintf(stdout, "2:{%d:{packetid:%d length:%d}}\n", i, packetid, len);
- ++i;
- mtask_finish(&mtask, flag);
- }
- else
- {
- packet = NULL;
- len = 0;
- sleep(1);
- }
- }while(1);
- }
- mtask_close(&mtask);
- }
- return 0;
- }
復(fù)制代碼 |
|