亚洲av成人无遮挡网站在线观看,少妇性bbb搡bbb爽爽爽,亚洲av日韩精品久久久久久,兔费看少妇性l交大片免费,无码少妇一区二区三区

Chinaunix

標題: 在PostgreSQL+plproxy中通過修改數(shù)據(jù)路由的方法實現(xiàn)的高可用方案 [打印本頁]

作者: osdba    時間: 2011-11-20 17:10
標題: 在PostgreSQL+plproxy中通過修改數(shù)據(jù)路由的方法實現(xiàn)的高可用方案
本帖最后由 osdba 于 2011-11-20 17:17 編輯

此文章也可見我的blog: http://blog.osdba.net/?post=72

    我們知道plproxy本身并不提供高可用方案。需要使用第三方的軟件來實現(xiàn)高可用方案。這里提供了一種通過寫一個簡單的python腳本的來探測后面的數(shù)據(jù)庫的狀態(tài),如果發(fā)現(xiàn)后面的數(shù)據(jù)庫壞了,就讓其修改數(shù)據(jù)路由,將其切換到備數(shù)據(jù)庫的方法。這里提供的方案數(shù)據(jù)并不做水平拆分,而是做垂直拆分的方案。也就是按業(yè)務(wù)拆分數(shù)據(jù),A業(yè)務(wù)的數(shù)據(jù)放在第一臺數(shù)據(jù)庫上,B業(yè)務(wù)的數(shù)據(jù)放到第二臺數(shù)據(jù)庫上。當然,如果想做數(shù)據(jù)水平拆分,只要修改一下數(shù)據(jù)路由的hash函數(shù),也是很容易做到的。

機器說明:

下面兩臺機器做plproxy代理庫

192.168.10.31: proxydb,安裝plproxy,做函數(shù)代理

192.168.10.32: proxydb,安裝plproxy,做函數(shù)代理

下面4臺機器放真實的業(yè)務(wù)數(shù)據(jù)
192.168.10.33: db01

192.168.10.34: db01

192.168.10.35: db01

192.168.10.36: db01


按plproxy的標準方法,在plproxy代理庫,建plproxy的3個標準函數(shù):

create or replace function plproxy.get_cluster_version(cluster_name text)

returns integer as $$

begin

    if cluster_name = 'cluster01' then

         return 1;

    end if;

    raise exception 'no such cluster: %', cluster_name;

end;

$$ language plpgsql;



create or replace function plproxy.get_cluster_config(cluster_name text, out key text, out val text)

returns setof record as $$

begin

    key := 'statement_timeout';

    val := 60;

    return next;

    return;

end; $$ language plpgsql;


CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text)

RETURNS SETOF text AS $$

BEGIN

    IF cluster_name = 'cluster01' THEN

        RETURN NEXT 'dbname=db01 host=192.168.10.33 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.34 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.35 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.36 user=buser password=buser';

        RETURN;

    END IF;

    RAISE EXCEPTION 'Unknown cluster';

END;

$$ LANGUAGE plpgsql;

在這個集群中,有4臺機器:

192.168.10.33 192.168.10.34 :  放業(yè)務(wù)A的數(shù)據(jù),192.168.10.33為主庫,192.168.10.34為備庫

192.168.10.35 192.168.10.36:放業(yè)務(wù)B的數(shù)據(jù),192.168.10.35為主庫,192.168.10.36為備庫。

在proxydb代理庫上建主備庫的關(guān)系表:

create table ha_config(

ha_groupid int primary key, --每一對主備庫,分配一個組id,

primary_ip text, --HA組中主數(shù)據(jù)庫的IP

standby_ip text,--HA組中備數(shù)據(jù)庫的IP

primary_hostid int,--HA組中主數(shù)據(jù)庫的ID,也就是在plproxy.get_cluster_partitions函數(shù)中主機的順序號

standby_hostid int,--HA組中備數(shù)據(jù)庫的ID,

current_hostid int,--當前HA在哪臺機器上

primary_hearttime timestamp, --主數(shù)據(jù)庫的心跳時間,表明探測程序在這個時間探測時,數(shù)據(jù)庫是好的。

standby_hearttime timestamp);--備數(shù)據(jù)庫的心跳時間,表明探測程序在這個時間探測時,數(shù)據(jù)庫是好的。

數(shù)據(jù)節(jié)點有兩組互為主備的節(jié)點,插入兩行數(shù)據(jù):

insert into ha_config values(1,'192.168.10.33','192.168.10.34',0,1,0,now(),now());

insert into ha_config values(2,'192.168.10.35','192.168.10.36',2,3,2,now(),now());



建一個函數(shù)路由表,表明這個這個函數(shù)應(yīng)該被路由到哪個HA組上的主機上:

create table function_route(

funcname text primary key,

ha_groupid int);

建hauser用戶,此用戶用來在proxydb上修改路由:

create user hauser password 'hauser';

grant ALL on ha_config to hauser;

grant ALL on function_route to hauser;



建可以根據(jù)ha_config表中current_hostid來計算路由的hash函數(shù)plp_route_by_funcname:

CREATE OR REPLACE FUNCTION public.plp_route_by_funcname(key text) returns int

AS $$

declare

ret int;

begin

     SELECT current_hostid INTO ret FROM function_route a,ha_config b where a.funcname=key and a.ha_groupid=b.ha_groupid;

     return ret;

end;

$$ LANGUAGE plpgsql;


在每個數(shù)據(jù)節(jié)點的數(shù)據(jù)庫上:

建hauser,后面的python程序用此用戶來探測這個數(shù)據(jù)庫是否還活著:

create user hauser password 'hauser';

建立心跳表,hauser用戶來更新這個表,如果不能更新,則說明這個數(shù)據(jù)庫壞了:

create table xdual(x timestamp);

insert into xdual values(now());

grant ALL on xdual to hauser;



在proxydb上建一個測試函數(shù):get_username,此函數(shù)根據(jù)userid得到用戶名:

CREATE OR REPLACE FUNCTION public.get_username(userid int) returns text

AS $$

CLUSTER 'cluster01';

RUN ON plp_route_by_funcname('get_username');

$$ LANGUAGE plproxy;



在function_route中插入記錄:

insert into function_route values('get_username',1);

這條記錄是把函數(shù)get_username的調(diào)用路由到了HA組1中,也就是“192.168.10.33,192.168.10.34”的HA組中,具體是路由到192.168.10.33還是192.168.10.34,則是由ha_config表中的“current_hostid”來指定的,初使用化時,current_hostid為0,所以會路由到192.168.10.33上。當python程序(后面的內(nèi)容會講這個python程序)當檢查到192.168.10.33出現(xiàn)問題時,會把current_hostid改為1,這時這個函數(shù)的調(diào)用就會被路由到192.168.10.34上,從而通過這種方法實現(xiàn)了HA。

在數(shù)據(jù)節(jié)點192.168.10.33和192.168.10.34上:

create table myuser(id int primary key, name text);

insert into myuser select generate_series(1,10000),'user'||generate_series(1,10000);

create or replace function public.get_username(userid int) returns text as $$

declare

   ret text;

begin

     SELECT name INTO ret FROM myuser where id=userid;

     return ret;

end;

$$ language plpgsql;


后面講解如何通過python腳本來探測后面的數(shù)據(jù)庫的狀態(tài),并自動進行切換的問題。
python為每一個后面的數(shù)據(jù)節(jié)點啟動一個線程,每隔一段時間就更新一個數(shù)據(jù)節(jié)點上的xdual心跳表,如果更新成功,則表ha_config表中primary_hearttime字段或standby_hearttime字段更新成當前時間,這樣再啟動一個線程,如果發(fā)現(xiàn)primary_hearttime和standby_hearttime時間與當前時間對比時超過心跳時間時,就說明這個數(shù)據(jù)庫出問題了,如果primary_hearttime和standby_hearttime時間與當前時間對比進都超過了心跳時間,則說明主備庫出現(xiàn)問題了,則不切換,反之則應(yīng)該切換到對端數(shù)據(jù)庫上。我寫的python程序如下:
#!/usr/bin/env python
# osdba 2011.11.20
# -*- coding:UTF-8

import psycopg2
import threading
import datetime
import time
import random
import signal
import sys
import traceback

g_hauser="hauser"
g_hapass="hauser"
g_proxydb="proxydb"

#g_logfile="/home/postgres/log/pg_error.log"
g_logfile="/home/postgres/plpha/myerror.log"

#心跳時間,也就是更新心跳表xdual的周期
g_heartbeat_interval = 10

#當ha_config表中primary_hearttime和standby_hearttime的值與當前時間超過下面的秒數(shù)后,就切換,此值應(yīng)該要大于g_heartbeat_interval
g_switch_timedelay = 20

#連接數(shù)據(jù)庫的超時時間
g_connect_timeout = 10


g_running = True




def myhandle(signum=0, e=0):
    """處理信號的函數(shù)"""
    global g_running
    print "recv sig %d" % (signum)
    g_running = False
   
def errlog(errinfo):
    global g_logfile
    f=open(g_logfile,'a')
    outinfo= time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))+" : plpcluster : "+errinfo+"\n"
    f.write(outinfo)
    print outinfo
    f.close()


def connect_proxydb():
    global g_hauser
    global g_hapass
    global g_proxydb
    global g_connect_timeout
   
    conn = psycopg2.connect("host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout) )
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    return conn

def connect_proxydb():
    global g_runningplpha.py
    global g_heartbeat_interval
    global g_connect_timeout
    errcnt = 0
    while g_running:
        try:
            connstr = "host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout)
            conn = psycopg2.connect(connstr)
            conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            if errcnt > 0:
                errlog("NOTICE : %s : reconnect successful." % (connstr) )
            break
        except Exception,e:
            errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
            errcnt = errcnt + 1
            time.sleep(g_connect_timeout)
    try:
        return conn
    except:
        return None

#探測數(shù)據(jù)庫的狀態(tài)的線程
class CMonitorHost(threading.Thread):
    def __init__(self, ha_groupid, hostip, isprimary, dbname, user, password):
        threading.Thread.__init__(self)
        self.ha_groupid = ha_groupid
        self.hostip = hostip
        self.isprimary = isprimary
        self.dbname = dbname
        self.user = user
        self.password =  password

    def connectbackend(self):
        global g_running
        global g_heartbeat_interval
        global g_connect_timeout
        errcnt = 0
        connstr = ""
        while g_running:
            try:
                connstr = "host=%s dbname=%s user=%s password=****** connect_timeout= %d" % (self.hostip,self.dbname,self.user,g_connect_timeout)
                conn = psycopg2.connect("host=%s dbname=%s user=%s password=%s connect_timeout=%s" % (self.hostip,self.dbname,self.user,self.password,g_connect_timeout))
                conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
                if errcnt > 0:
                    errlog("NOTICE : %s : reconnect successful." % (connstr) )
                break
            except Exception,e:
                errcnt = errcnt + 1
                if errcnt < 3:
                    errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
                time.sleep(g_connect_timeout)
        try:
            return conn
        except:
            return None

    def run(self):
        global g_running
        global g_heartbeat_interval
        
        conn = self.connectbackend()
        if not conn:
            return
        cur = conn.cursor()
        
        while g_running :
            try:
                runsql = "update xdual set x=now()"
                cur.execute(runsql)
            except Exception,e:
                errlog("RUN SQL ERROR : %s : %s " % (runsql,traceback.format_exc()))
                try:
                    cur.close()
                    conn.close()
                except :
                    pass
               
                conn = self.connectbackend()
                if  not g_running:
                    break

                if conn:
                    cur = conn.cursor()
                else:
                    break
                time.sleep(g_heartbeat_interval)
                continue
            self.update_cl_state()
            time.sleep(g_heartbeat_interval)

        try:
            cur.close()
            conn.close()
        except:
            pass

    def update_cl_state(self):
        global g_hauser
        global g_hapass
        global g_proxydb
        
        try:
            conn = connect_proxydb()
            cur = conn.cursor()
            if self.isprimary:
                runsql = "update ha_config set primary_hearttime=now() where ha_groupid = %d" % (self.ha_groupid)
                cur.execute("update ha_config set primary_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
            else:
                runsql = "update ha_config set standby_hearttime=now() where ha_groupid = %s" % (self.ha_groupid)
                cur.execute("update ha_config set standby_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
            cur.close()
            conn.close()
        except Exception,e:
            errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))

signal.signal(signal.SIGINT, myhandle)
signal.signal(signal.SIGTERM, myhandle)

try:
    conn = connect_proxydb()
except Exception,e:
    errlog("CONNECT ERROR : %s " % (traceback.format_exc()) )
    g_running = True
    time.sleep(1)
    sys.exit()

cur = conn.cursor()
try:
    runsql = "select ha_groupid,primary_ip from ha_config"
    cur.execute(runsql)
    res = cur.fetchone()
    while res :
        t = CMonitorHost(res[0], res[1], True, "db01", g_hauser, g_hapass)
        t.start()
        res = cur.fetchone()

    runsql = "select ha_groupid,standby_ip from ha_config"
    cur.execute(runsql)
    res = cur.fetchone()
    while res :
        t = CMonitorHost(res[0], res[1], False, "db01", g_hauser, g_hapass)
        t.start()
        res = cur.fetchone()

except Exception,e:
    errlog(e.pgcode+" : RUN SQL ERROR : "+runsql+" : "+ e.pgerror)
    g_running = True
    time.sleep(1)
    sys.exit()

time.sleep(2)

# 檢查是否要切換
while g_running:
    try:
        runsql = "SELECT ha_groupid, extract(epoch from (now() - standby_hearttime)) shtime from ha_config where extract(epoch from (now() - primary_hearttime)) > %d and current_hostid=primary_hostid" % (g_switch_timedelay)
        cur.execute(runsql)
        res = cur.fetchone()
        while res :
            if res[1] < g_switch_timedelay:
                errlog("ERROR: ha_groupid(%d) switch to standby database!" % (res[0]))
                cur2 = conn.cursor()
                runsql = "update ha_config set current_hostid=standby_hostid where ha_groupid=%s" % (res[0])
                cur2.execute("update ha_config set current_hostid=standby_hostid where ha_groupid=%s", (res[0],))
                cur2.close()
            else:
                errlog("ERROR: ha_groupid(%d) primary and standby all failed!!!" % (res[0]))
            res = cur.fetchone()

        runsql = "SELECT ha_groupid, extract(epoch from (now() - primary_hearttime)) shtime from ha_config where extract(epoch from (now() - standby_hearttime)) > %d and current_hostid=standby_hostid" % (g_switch_timedelay)
        cur.execute(runsql)
        res = cur.fetchone()
        while res :
            if res[1] < g_switch_timedelay:
                errlog("ERROR: ha_groupid: %d switch to primary database!" % (res[0]))
                cur2 = conn.cursor()
                runsql = "update ha_config set current_hostid=primary_hostid where ha_groupid=%s" % (res[0])
                cur2.execute("update ha_config set current_hostid=primary_hostid where ha_groupid=%s", (res[0],))
                cur2.close()
            else:
                errlog("ERROR: ha_groupid: %d primary and standby all failed!!!" % (res[0]))
            res = cur.fetchone()
            
    except Exception,e:
        errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
        try:
            cur.close()
            conn.close()
        except :
            pass
        
        conn = connect_proxydb()
        cur = conn.cursor()

    time.sleep(g_heartbeat_interval)

cur.close()
conn.close()

這本文之中,沒有講解HA組中主備庫之間的數(shù)據(jù)如何同步,實踐中主備庫之間的數(shù)據(jù)同步可以使用bucardo做數(shù)據(jù)雙向同步, 也可以使用slony來同步,但由于slony不支持雙master架構(gòu),備庫是不能寫的,所以使用slony時,還需要修改我寫的這個plpha.py腳本,讓其可以把slony的備庫提升成主庫。當然還有一些其它的方案。

另,雖然有兩臺plproxy,但當一臺plproxy出現(xiàn)問題時,應(yīng)用如何切換到另一臺plproxy,也需要考慮,在我們公司是使用F5負載均衡器來實現(xiàn)的。
作者: zhuomingliang    時間: 2011-11-22 12:32
收藏一份
作者: renxiao2003    時間: 2011-12-21 21:45
收藏了。沒有做過雙機




歡迎光臨 Chinaunix (http://72891.cn/) Powered by Discuz! X3.2