亚洲av成人无遮挡网站在线观看,少妇性bbb搡bbb爽爽爽,亚洲av日韩精品久久久久久,兔费看少妇性l交大片免费,无码少妇一区二区三区

  免費(fèi)注冊 查看新帖 |

Chinaunix

  平臺 論壇 博客 文庫
最近訪問板塊 發(fā)新帖
查看: 5919 | 回復(fù): 2
打印 上一主題 下一主題

在PostgreSQL+plproxy中通過修改數(shù)據(jù)路由的方法實(shí)現(xiàn)的高可用方案 [復(fù)制鏈接]

論壇徽章:
3
數(shù)據(jù)庫技術(shù)版塊每日發(fā)帖之星
日期:2015-06-18 22:20:00數(shù)據(jù)庫技術(shù)版塊每日發(fā)帖之星
日期:2015-06-21 22:20:00數(shù)據(jù)庫技術(shù)版塊每日發(fā)帖之星
日期:2015-08-27 06:20:00
跳轉(zhuǎn)到指定樓層
1 [收藏(0)] [報(bào)告]
發(fā)表于 2011-11-20 17:10 |只看該作者 |倒序?yàn)g覽
本帖最后由 osdba 于 2011-11-20 17:17 編輯

此文章也可見我的blog: http://blog.osdba.net/?post=72

    我們知道plproxy本身并不提供高可用方案。需要使用第三方的軟件來實(shí)現(xiàn)高可用方案。這里提供了一種通過寫一個(gè)簡單的python腳本的來探測后面的數(shù)據(jù)庫的狀態(tài),如果發(fā)現(xiàn)后面的數(shù)據(jù)庫壞了,就讓其修改數(shù)據(jù)路由,將其切換到備數(shù)據(jù)庫的方法。這里提供的方案數(shù)據(jù)并不做水平拆分,而是做垂直拆分的方案。也就是按業(yè)務(wù)拆分?jǐn)?shù)據(jù),A業(yè)務(wù)的數(shù)據(jù)放在第一臺數(shù)據(jù)庫上,B業(yè)務(wù)的數(shù)據(jù)放到第二臺數(shù)據(jù)庫上。當(dāng)然,如果想做數(shù)據(jù)水平拆分,只要修改一下數(shù)據(jù)路由的hash函數(shù),也是很容易做到的。

機(jī)器說明:

下面兩臺機(jī)器做plproxy代理庫

192.168.10.31: proxydb,安裝plproxy,做函數(shù)代理

192.168.10.32: proxydb,安裝plproxy,做函數(shù)代理

下面4臺機(jī)器放真實(shí)的業(yè)務(wù)數(shù)據(jù)
192.168.10.33: db01

192.168.10.34: db01

192.168.10.35: db01

192.168.10.36: db01


按plproxy的標(biāo)準(zhǔn)方法,在plproxy代理庫,建plproxy的3個(gè)標(biāo)準(zhǔn)函數(shù):

create or replace function plproxy.get_cluster_version(cluster_name text)

returns integer as $$

begin

    if cluster_name = 'cluster01' then

         return 1;

    end if;

    raise exception 'no such cluster: %', cluster_name;

end;

$$ language plpgsql;



create or replace function plproxy.get_cluster_config(cluster_name text, out key text, out val text)

returns setof record as $$

begin

    key := 'statement_timeout';

    val := 60;

    return next;

    return;

end; $$ language plpgsql;


CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text)

RETURNS SETOF text AS $$

BEGIN

    IF cluster_name = 'cluster01' THEN

        RETURN NEXT 'dbname=db01 host=192.168.10.33 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.34 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.35 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.36 user=buser password=buser';

        RETURN;

    END IF;

    RAISE EXCEPTION 'Unknown cluster';

END;

$$ LANGUAGE plpgsql;

在這個(gè)集群中,有4臺機(jī)器:

192.168.10.33 192.168.10.34 :  放業(yè)務(wù)A的數(shù)據(jù),192.168.10.33為主庫,192.168.10.34為備庫

192.168.10.35 192.168.10.36:放業(yè)務(wù)B的數(shù)據(jù),192.168.10.35為主庫,192.168.10.36為備庫。

在proxydb代理庫上建主備庫的關(guān)系表:

create table ha_config(

ha_groupid int primary key, --每一對主備庫,分配一個(gè)組id,

primary_ip text, --HA組中主數(shù)據(jù)庫的IP

standby_ip text,--HA組中備數(shù)據(jù)庫的IP

primary_hostid int,--HA組中主數(shù)據(jù)庫的ID,也就是在plproxy.get_cluster_partitions函數(shù)中主機(jī)的順序號

standby_hostid int,--HA組中備數(shù)據(jù)庫的ID,

current_hostid int,--當(dāng)前HA在哪臺機(jī)器上

primary_hearttime timestamp, --主數(shù)據(jù)庫的心跳時(shí)間,表明探測程序在這個(gè)時(shí)間探測時(shí),數(shù)據(jù)庫是好的。

standby_hearttime timestamp);--備數(shù)據(jù)庫的心跳時(shí)間,表明探測程序在這個(gè)時(shí)間探測時(shí),數(shù)據(jù)庫是好的。

數(shù)據(jù)節(jié)點(diǎn)有兩組互為主備的節(jié)點(diǎn),插入兩行數(shù)據(jù):

insert into ha_config values(1,'192.168.10.33','192.168.10.34',0,1,0,now(),now());

insert into ha_config values(2,'192.168.10.35','192.168.10.36',2,3,2,now(),now());



建一個(gè)函數(shù)路由表,表明這個(gè)這個(gè)函數(shù)應(yīng)該被路由到哪個(gè)HA組上的主機(jī)上:

create table function_route(

funcname text primary key,

ha_groupid int);

建hauser用戶,此用戶用來在proxydb上修改路由:

create user hauser password 'hauser';

grant ALL on ha_config to hauser;

grant ALL on function_route to hauser;



建可以根據(jù)ha_config表中current_hostid來計(jì)算路由的hash函數(shù)plp_route_by_funcname:

CREATE OR REPLACE FUNCTION public.plp_route_by_funcname(key text) returns int

AS $$

declare

ret int;

begin

     SELECT current_hostid INTO ret FROM function_route a,ha_config b where a.funcname=key and a.ha_groupid=b.ha_groupid;

     return ret;

end;

$$ LANGUAGE plpgsql;


在每個(gè)數(shù)據(jù)節(jié)點(diǎn)的數(shù)據(jù)庫上:

建hauser,后面的python程序用此用戶來探測這個(gè)數(shù)據(jù)庫是否還活著:

create user hauser password 'hauser';

建立心跳表,hauser用戶來更新這個(gè)表,如果不能更新,則說明這個(gè)數(shù)據(jù)庫壞了:

create table xdual(x timestamp);

insert into xdual values(now());

grant ALL on xdual to hauser;



在proxydb上建一個(gè)測試函數(shù):get_username,此函數(shù)根據(jù)userid得到用戶名:

CREATE OR REPLACE FUNCTION public.get_username(userid int) returns text

AS $$

CLUSTER 'cluster01';

RUN ON plp_route_by_funcname('get_username');

$$ LANGUAGE plproxy;



在function_route中插入記錄:

insert into function_route values('get_username',1);

這條記錄是把函數(shù)get_username的調(diào)用路由到了HA組1中,也就是“192.168.10.33,192.168.10.34”的HA組中,具體是路由到192.168.10.33還是192.168.10.34,則是由ha_config表中的“current_hostid”來指定的,初使用化時(shí),current_hostid為0,所以會(huì)路由到192.168.10.33上。當(dāng)python程序(后面的內(nèi)容會(huì)講這個(gè)python程序)當(dāng)檢查到192.168.10.33出現(xiàn)問題時(shí),會(huì)把current_hostid改為1,這時(shí)這個(gè)函數(shù)的調(diào)用就會(huì)被路由到192.168.10.34上,從而通過這種方法實(shí)現(xiàn)了HA。

在數(shù)據(jù)節(jié)點(diǎn)192.168.10.33和192.168.10.34上:

create table myuser(id int primary key, name text);

insert into myuser select generate_series(1,10000),'user'||generate_series(1,10000);

create or replace function public.get_username(userid int) returns text as $$

declare

   ret text;

begin

     SELECT name INTO ret FROM myuser where id=userid;

     return ret;

end;

$$ language plpgsql;


后面講解如何通過python腳本來探測后面的數(shù)據(jù)庫的狀態(tài),并自動(dòng)進(jìn)行切換的問題。
python為每一個(gè)后面的數(shù)據(jù)節(jié)點(diǎn)啟動(dòng)一個(gè)線程,每隔一段時(shí)間就更新一個(gè)數(shù)據(jù)節(jié)點(diǎn)上的xdual心跳表,如果更新成功,則表ha_config表中primary_hearttime字段或standby_hearttime字段更新成當(dāng)前時(shí)間,這樣再啟動(dòng)一個(gè)線程,如果發(fā)現(xiàn)primary_hearttime和standby_hearttime時(shí)間與當(dāng)前時(shí)間對比時(shí)超過心跳時(shí)間時(shí),就說明這個(gè)數(shù)據(jù)庫出問題了,如果primary_hearttime和standby_hearttime時(shí)間與當(dāng)前時(shí)間對比進(jìn)都超過了心跳時(shí)間,則說明主備庫出現(xiàn)問題了,則不切換,反之則應(yīng)該切換到對端數(shù)據(jù)庫上。我寫的python程序如下:
#!/usr/bin/env python
# osdba 2011.11.20
# -*- coding:UTF-8

import psycopg2
import threading
import datetime
import time
import random
import signal
import sys
import traceback

g_hauser="hauser"
g_hapass="hauser"
g_proxydb="proxydb"

#g_logfile="/home/postgres/log/pg_error.log"
g_logfile="/home/postgres/plpha/myerror.log"

#心跳時(shí)間,也就是更新心跳表xdual的周期
g_heartbeat_interval = 10

#當(dāng)ha_config表中primary_hearttime和standby_hearttime的值與當(dāng)前時(shí)間超過下面的秒數(shù)后,就切換,此值應(yīng)該要大于g_heartbeat_interval
g_switch_timedelay = 20

#連接數(shù)據(jù)庫的超時(shí)時(shí)間
g_connect_timeout = 10


g_running = True




def myhandle(signum=0, e=0):
    """處理信號的函數(shù)"""
    global g_running
    print "recv sig %d" % (signum)
    g_running = False
   
def errlog(errinfo):
    global g_logfile
    f=open(g_logfile,'a')
    outinfo= time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))+" : plpcluster : "+errinfo+"\n"
    f.write(outinfo)
    print outinfo
    f.close()


def connect_proxydb():
    global g_hauser
    global g_hapass
    global g_proxydb
    global g_connect_timeout
   
    conn = psycopg2.connect("host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout) )
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    return conn

def connect_proxydb():
    global g_runningplpha.py
    global g_heartbeat_interval
    global g_connect_timeout
    errcnt = 0
    while g_running:
        try:
            connstr = "host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout)
            conn = psycopg2.connect(connstr)
            conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            if errcnt > 0:
                errlog("NOTICE : %s : reconnect successful." % (connstr) )
            break
        except Exception,e:
            errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
            errcnt = errcnt + 1
            time.sleep(g_connect_timeout)
    try:
        return conn
    except:
        return None

#探測數(shù)據(jù)庫的狀態(tài)的線程
class CMonitorHost(threading.Thread):
    def __init__(self, ha_groupid, hostip, isprimary, dbname, user, password):
        threading.Thread.__init__(self)
        self.ha_groupid = ha_groupid
        self.hostip = hostip
        self.isprimary = isprimary
        self.dbname = dbname
        self.user = user
        self.password =  password

    def connectbackend(self):
        global g_running
        global g_heartbeat_interval
        global g_connect_timeout
        errcnt = 0
        connstr = ""
        while g_running:
            try:
                connstr = "host=%s dbname=%s user=%s password=****** connect_timeout= %d" % (self.hostip,self.dbname,self.user,g_connect_timeout)
                conn = psycopg2.connect("host=%s dbname=%s user=%s password=%s connect_timeout=%s" % (self.hostip,self.dbname,self.user,self.password,g_connect_timeout))
                conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
                if errcnt > 0:
                    errlog("NOTICE : %s : reconnect successful." % (connstr) )
                break
            except Exception,e:
                errcnt = errcnt + 1
                if errcnt < 3:
                    errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
                time.sleep(g_connect_timeout)
        try:
            return conn
        except:
            return None

    def run(self):
        global g_running
        global g_heartbeat_interval
        
        conn = self.connectbackend()
        if not conn:
            return
        cur = conn.cursor()
        
        while g_running :
            try:
                runsql = "update xdual set x=now()"
                cur.execute(runsql)
            except Exception,e:
                errlog("RUN SQL ERROR : %s : %s " % (runsql,traceback.format_exc()))
                try:
                    cur.close()
                    conn.close()
                except :
                    pass
               
                conn = self.connectbackend()
                if  not g_running:
                    break

                if conn:
                    cur = conn.cursor()
                else:
                    break
                time.sleep(g_heartbeat_interval)
                continue
            self.update_cl_state()
            time.sleep(g_heartbeat_interval)

        try:
            cur.close()
            conn.close()
        except:
            pass

    def update_cl_state(self):
        global g_hauser
        global g_hapass
        global g_proxydb
        
        try:
            conn = connect_proxydb()
            cur = conn.cursor()
            if self.isprimary:
                runsql = "update ha_config set primary_hearttime=now() where ha_groupid = %d" % (self.ha_groupid)
                cur.execute("update ha_config set primary_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
            else:
                runsql = "update ha_config set standby_hearttime=now() where ha_groupid = %s" % (self.ha_groupid)
                cur.execute("update ha_config set standby_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
            cur.close()
            conn.close()
        except Exception,e:
            errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))

signal.signal(signal.SIGINT, myhandle)
signal.signal(signal.SIGTERM, myhandle)

try:
    conn = connect_proxydb()
except Exception,e:
    errlog("CONNECT ERROR : %s " % (traceback.format_exc()) )
    g_running = True
    time.sleep(1)
    sys.exit()

cur = conn.cursor()
try:
    runsql = "select ha_groupid,primary_ip from ha_config"
    cur.execute(runsql)
    res = cur.fetchone()
    while res :
        t = CMonitorHost(res[0], res[1], True, "db01", g_hauser, g_hapass)
        t.start()
        res = cur.fetchone()

    runsql = "select ha_groupid,standby_ip from ha_config"
    cur.execute(runsql)
    res = cur.fetchone()
    while res :
        t = CMonitorHost(res[0], res[1], False, "db01", g_hauser, g_hapass)
        t.start()
        res = cur.fetchone()

except Exception,e:
    errlog(e.pgcode+" : RUN SQL ERROR : "+runsql+" : "+ e.pgerror)
    g_running = True
    time.sleep(1)
    sys.exit()

time.sleep(2)

# 檢查是否要切換
while g_running:
    try:
        runsql = "SELECT ha_groupid, extract(epoch from (now() - standby_hearttime)) shtime from ha_config where extract(epoch from (now() - primary_hearttime)) > %d and current_hostid=primary_hostid" % (g_switch_timedelay)
        cur.execute(runsql)
        res = cur.fetchone()
        while res :
            if res[1] < g_switch_timedelay:
                errlog("ERROR: ha_groupid(%d) switch to standby database!" % (res[0]))
                cur2 = conn.cursor()
                runsql = "update ha_config set current_hostid=standby_hostid where ha_groupid=%s" % (res[0])
                cur2.execute("update ha_config set current_hostid=standby_hostid where ha_groupid=%s", (res[0],))
                cur2.close()
            else:
                errlog("ERROR: ha_groupid(%d) primary and standby all failed!!!" % (res[0]))
            res = cur.fetchone()

        runsql = "SELECT ha_groupid, extract(epoch from (now() - primary_hearttime)) shtime from ha_config where extract(epoch from (now() - standby_hearttime)) > %d and current_hostid=standby_hostid" % (g_switch_timedelay)
        cur.execute(runsql)
        res = cur.fetchone()
        while res :
            if res[1] < g_switch_timedelay:
                errlog("ERROR: ha_groupid: %d switch to primary database!" % (res[0]))
                cur2 = conn.cursor()
                runsql = "update ha_config set current_hostid=primary_hostid where ha_groupid=%s" % (res[0])
                cur2.execute("update ha_config set current_hostid=primary_hostid where ha_groupid=%s", (res[0],))
                cur2.close()
            else:
                errlog("ERROR: ha_groupid: %d primary and standby all failed!!!" % (res[0]))
            res = cur.fetchone()
            
    except Exception,e:
        errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
        try:
            cur.close()
            conn.close()
        except :
            pass
        
        conn = connect_proxydb()
        cur = conn.cursor()

    time.sleep(g_heartbeat_interval)

cur.close()
conn.close()

這本文之中,沒有講解HA組中主備庫之間的數(shù)據(jù)如何同步,實(shí)踐中主備庫之間的數(shù)據(jù)同步可以使用bucardo做數(shù)據(jù)雙向同步, 也可以使用slony來同步,但由于slony不支持雙master架構(gòu),備庫是不能寫的,所以使用slony時(shí),還需要修改我寫的這個(gè)plpha.py腳本,讓其可以把slony的備庫提升成主庫。當(dāng)然還有一些其它的方案。

另,雖然有兩臺plproxy,但當(dāng)一臺plproxy出現(xiàn)問題時(shí),應(yīng)用如何切換到另一臺plproxy,也需要考慮,在我們公司是使用F5負(fù)載均衡器來實(shí)現(xiàn)的。

論壇徽章:
0
2 [報(bào)告]
發(fā)表于 2011-11-22 12:32 |只看該作者
收藏一份

論壇徽章:
59
2015七夕節(jié)徽章
日期:2015-08-24 11:17:25ChinaUnix專家徽章
日期:2015-07-20 09:19:30每周論壇發(fā)貼之星
日期:2015-07-20 09:19:42ChinaUnix元老
日期:2015-07-20 11:04:38榮譽(yù)版主
日期:2015-07-20 11:05:19巳蛇
日期:2015-07-20 11:05:26CU十二周年紀(jì)念徽章
日期:2015-07-20 11:05:27IT運(yùn)維版塊每日發(fā)帖之星
日期:2015-07-20 11:05:34操作系統(tǒng)版塊每日發(fā)帖之星
日期:2015-07-20 11:05:36程序設(shè)計(jì)版塊每日發(fā)帖之星
日期:2015-07-20 11:05:40數(shù)據(jù)庫技術(shù)版塊每日發(fā)帖之星
日期:2015-07-20 11:05:432015年辭舊歲徽章
日期:2015-07-20 11:05:44
3 [報(bào)告]
發(fā)表于 2011-12-21 21:45 |只看該作者
收藏了。沒有做過雙機(jī)
您需要登錄后才可以回帖 登錄 | 注冊

本版積分規(guī)則 發(fā)表回復(fù)

  

北京盛拓優(yōu)訊信息技術(shù)有限公司. 版權(quán)所有 京ICP備16024965號-6 北京市公安局海淀分局網(wǎng)監(jiān)中心備案編號:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年舉報(bào)專區(qū)
中國互聯(lián)網(wǎng)協(xié)會(huì)會(huì)員  聯(lián)系我們:huangweiwei@itpub.net
感謝所有關(guān)心和支持過ChinaUnix的朋友們 轉(zhuǎn)載本站內(nèi)容請注明原作者名及出處

清除 Cookies - ChinaUnix - Archiver - WAP - TOP