- 論壇徽章:
- 3
|
本帖最后由 osdba 于 2011-11-20 17:17 編輯
此文章也可見我的blog: http://blog.osdba.net/?post=72
我們知道plproxy本身并不提供高可用方案。需要使用第三方的軟件來實(shí)現(xiàn)高可用方案。這里提供了一種通過寫一個(gè)簡單的python腳本的來探測后面的數(shù)據(jù)庫的狀態(tài),如果發(fā)現(xiàn)后面的數(shù)據(jù)庫壞了,就讓其修改數(shù)據(jù)路由,將其切換到備數(shù)據(jù)庫的方法。這里提供的方案數(shù)據(jù)并不做水平拆分,而是做垂直拆分的方案。也就是按業(yè)務(wù)拆分?jǐn)?shù)據(jù),A業(yè)務(wù)的數(shù)據(jù)放在第一臺數(shù)據(jù)庫上,B業(yè)務(wù)的數(shù)據(jù)放到第二臺數(shù)據(jù)庫上。當(dāng)然,如果想做數(shù)據(jù)水平拆分,只要修改一下數(shù)據(jù)路由的hash函數(shù),也是很容易做到的。
機(jī)器說明:
下面兩臺機(jī)器做plproxy代理庫
192.168.10.31: proxydb,安裝plproxy,做函數(shù)代理
192.168.10.32: proxydb,安裝plproxy,做函數(shù)代理
下面4臺機(jī)器放真實(shí)的業(yè)務(wù)數(shù)據(jù)
192.168.10.33: db01
192.168.10.34: db01
192.168.10.35: db01
192.168.10.36: db01
按plproxy的標(biāo)準(zhǔn)方法,在plproxy代理庫,建plproxy的3個(gè)標(biāo)準(zhǔn)函數(shù):
create or replace function plproxy.get_cluster_version(cluster_name text)
returns integer as $$
begin
if cluster_name = 'cluster01' then
return 1;
end if;
raise exception 'no such cluster: %', cluster_name;
end;
$$ language plpgsql;
create or replace function plproxy.get_cluster_config(cluster_name text, out key text, out val text)
returns setof record as $$
begin
key := 'statement_timeout';
val := 60;
return next;
return;
end; $$ language plpgsql;
CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text)
RETURNS SETOF text AS $$
BEGIN
IF cluster_name = 'cluster01' THEN
RETURN NEXT 'dbname=db01 host=192.168.10.33 user=buser password=buser';
RETURN NEXT 'dbname=db01 host=192.168.10.34 user=buser password=buser';
RETURN NEXT 'dbname=db01 host=192.168.10.35 user=buser password=buser';
RETURN NEXT 'dbname=db01 host=192.168.10.36 user=buser password=buser';
RETURN;
END IF;
RAISE EXCEPTION 'Unknown cluster';
END;
$$ LANGUAGE plpgsql;
在這個(gè)集群中,有4臺機(jī)器:
192.168.10.33 192.168.10.34 : 放業(yè)務(wù)A的數(shù)據(jù),192.168.10.33為主庫,192.168.10.34為備庫
192.168.10.35 192.168.10.36:放業(yè)務(wù)B的數(shù)據(jù),192.168.10.35為主庫,192.168.10.36為備庫。
在proxydb代理庫上建主備庫的關(guān)系表:
create table ha_config(
ha_groupid int primary key, --每一對主備庫,分配一個(gè)組id,
primary_ip text, --HA組中主數(shù)據(jù)庫的IP
standby_ip text,--HA組中備數(shù)據(jù)庫的IP
primary_hostid int,--HA組中主數(shù)據(jù)庫的ID,也就是在plproxy.get_cluster_partitions函數(shù)中主機(jī)的順序號
standby_hostid int,--HA組中備數(shù)據(jù)庫的ID,
current_hostid int,--當(dāng)前HA在哪臺機(jī)器上
primary_hearttime timestamp, --主數(shù)據(jù)庫的心跳時(shí)間,表明探測程序在這個(gè)時(shí)間探測時(shí),數(shù)據(jù)庫是好的。
standby_hearttime timestamp);--備數(shù)據(jù)庫的心跳時(shí)間,表明探測程序在這個(gè)時(shí)間探測時(shí),數(shù)據(jù)庫是好的。
數(shù)據(jù)節(jié)點(diǎn)有兩組互為主備的節(jié)點(diǎn),插入兩行數(shù)據(jù):
insert into ha_config values(1,'192.168.10.33','192.168.10.34',0,1,0,now(),now());
insert into ha_config values(2,'192.168.10.35','192.168.10.36',2,3,2,now(),now());
建一個(gè)函數(shù)路由表,表明這個(gè)這個(gè)函數(shù)應(yīng)該被路由到哪個(gè)HA組上的主機(jī)上:
create table function_route(
funcname text primary key,
ha_groupid int);
建hauser用戶,此用戶用來在proxydb上修改路由:
create user hauser password 'hauser';
grant ALL on ha_config to hauser;
grant ALL on function_route to hauser;
建可以根據(jù)ha_config表中current_hostid來計(jì)算路由的hash函數(shù)plp_route_by_funcname:
CREATE OR REPLACE FUNCTION public.plp_route_by_funcname(key text) returns int
AS $$
declare
ret int;
begin
SELECT current_hostid INTO ret FROM function_route a,ha_config b where a.funcname=key and a.ha_groupid=b.ha_groupid;
return ret;
end;
$$ LANGUAGE plpgsql;
在每個(gè)數(shù)據(jù)節(jié)點(diǎn)的數(shù)據(jù)庫上:
建hauser,后面的python程序用此用戶來探測這個(gè)數(shù)據(jù)庫是否還活著:
create user hauser password 'hauser';
建立心跳表,hauser用戶來更新這個(gè)表,如果不能更新,則說明這個(gè)數(shù)據(jù)庫壞了:
create table xdual(x timestamp);
insert into xdual values(now());
grant ALL on xdual to hauser;
在proxydb上建一個(gè)測試函數(shù):get_username,此函數(shù)根據(jù)userid得到用戶名:
CREATE OR REPLACE FUNCTION public.get_username(userid int) returns text
AS $$
CLUSTER 'cluster01';
RUN ON plp_route_by_funcname('get_username');
$$ LANGUAGE plproxy;
在function_route中插入記錄:
insert into function_route values('get_username',1);
這條記錄是把函數(shù)get_username的調(diào)用路由到了HA組1中,也就是“192.168.10.33,192.168.10.34”的HA組中,具體是路由到192.168.10.33還是192.168.10.34,則是由ha_config表中的“current_hostid”來指定的,初使用化時(shí),current_hostid為0,所以會(huì)路由到192.168.10.33上。當(dāng)python程序(后面的內(nèi)容會(huì)講這個(gè)python程序)當(dāng)檢查到192.168.10.33出現(xiàn)問題時(shí),會(huì)把current_hostid改為1,這時(shí)這個(gè)函數(shù)的調(diào)用就會(huì)被路由到192.168.10.34上,從而通過這種方法實(shí)現(xiàn)了HA。
在數(shù)據(jù)節(jié)點(diǎn)192.168.10.33和192.168.10.34上:
create table myuser(id int primary key, name text);
insert into myuser select generate_series(1,10000),'user'||generate_series(1,10000);
create or replace function public.get_username(userid int) returns text as $$
declare
ret text;
begin
SELECT name INTO ret FROM myuser where id=userid;
return ret;
end;
$$ language plpgsql;
后面講解如何通過python腳本來探測后面的數(shù)據(jù)庫的狀態(tài),并自動(dòng)進(jìn)行切換的問題。
python為每一個(gè)后面的數(shù)據(jù)節(jié)點(diǎn)啟動(dòng)一個(gè)線程,每隔一段時(shí)間就更新一個(gè)數(shù)據(jù)節(jié)點(diǎn)上的xdual心跳表,如果更新成功,則表ha_config表中primary_hearttime字段或standby_hearttime字段更新成當(dāng)前時(shí)間,這樣再啟動(dòng)一個(gè)線程,如果發(fā)現(xiàn)primary_hearttime和standby_hearttime時(shí)間與當(dāng)前時(shí)間對比時(shí)超過心跳時(shí)間時(shí),就說明這個(gè)數(shù)據(jù)庫出問題了,如果primary_hearttime和standby_hearttime時(shí)間與當(dāng)前時(shí)間對比進(jìn)都超過了心跳時(shí)間,則說明主備庫出現(xiàn)問題了,則不切換,反之則應(yīng)該切換到對端數(shù)據(jù)庫上。我寫的python程序如下:
#!/usr/bin/env python
# osdba 2011.11.20
# -*- coding:UTF-8
import psycopg2
import threading
import datetime
import time
import random
import signal
import sys
import traceback
g_hauser="hauser"
g_hapass="hauser"
g_proxydb="proxydb"
#g_logfile="/home/postgres/log/pg_error.log"
g_logfile="/home/postgres/plpha/myerror.log"
#心跳時(shí)間,也就是更新心跳表xdual的周期
g_heartbeat_interval = 10
#當(dāng)ha_config表中primary_hearttime和standby_hearttime的值與當(dāng)前時(shí)間超過下面的秒數(shù)后,就切換,此值應(yīng)該要大于g_heartbeat_interval
g_switch_timedelay = 20
#連接數(shù)據(jù)庫的超時(shí)時(shí)間
g_connect_timeout = 10
g_running = True
def myhandle(signum=0, e=0):
"""處理信號的函數(shù)"""
global g_running
print "recv sig %d" % (signum)
g_running = False
def errlog(errinfo):
global g_logfile
f=open(g_logfile,'a')
outinfo= time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))+" : plpcluster : "+errinfo+"\n"
f.write(outinfo)
print outinfo
f.close()
def connect_proxydb():
global g_hauser
global g_hapass
global g_proxydb
global g_connect_timeout
conn = psycopg2.connect("host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout) )
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def connect_proxydb():
global g_runningplpha.py
global g_heartbeat_interval
global g_connect_timeout
errcnt = 0
while g_running:
try:
connstr = "host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout)
conn = psycopg2.connect(connstr)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if errcnt > 0:
errlog("NOTICE : %s : reconnect successful." % (connstr) )
break
except Exception,e:
errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
errcnt = errcnt + 1
time.sleep(g_connect_timeout)
try:
return conn
except:
return None
#探測數(shù)據(jù)庫的狀態(tài)的線程
class CMonitorHost(threading.Thread):
def __init__(self, ha_groupid, hostip, isprimary, dbname, user, password):
threading.Thread.__init__(self)
self.ha_groupid = ha_groupid
self.hostip = hostip
self.isprimary = isprimary
self.dbname = dbname
self.user = user
self.password = password
def connectbackend(self):
global g_running
global g_heartbeat_interval
global g_connect_timeout
errcnt = 0
connstr = ""
while g_running:
try:
connstr = "host=%s dbname=%s user=%s password=****** connect_timeout= %d" % (self.hostip,self.dbname,self.user,g_connect_timeout)
conn = psycopg2.connect("host=%s dbname=%s user=%s password=%s connect_timeout=%s" % (self.hostip,self.dbname,self.user,self.password,g_connect_timeout))
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if errcnt > 0:
errlog("NOTICE : %s : reconnect successful." % (connstr) )
break
except Exception,e:
errcnt = errcnt + 1
if errcnt < 3:
errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
time.sleep(g_connect_timeout)
try:
return conn
except:
return None
def run(self):
global g_running
global g_heartbeat_interval
conn = self.connectbackend()
if not conn:
return
cur = conn.cursor()
while g_running :
try:
runsql = "update xdual set x=now()"
cur.execute(runsql)
except Exception,e:
errlog("RUN SQL ERROR : %s : %s " % (runsql,traceback.format_exc()))
try:
cur.close()
conn.close()
except :
pass
conn = self.connectbackend()
if not g_running:
break
if conn:
cur = conn.cursor()
else:
break
time.sleep(g_heartbeat_interval)
continue
self.update_cl_state()
time.sleep(g_heartbeat_interval)
try:
cur.close()
conn.close()
except:
pass
def update_cl_state(self):
global g_hauser
global g_hapass
global g_proxydb
try:
conn = connect_proxydb()
cur = conn.cursor()
if self.isprimary:
runsql = "update ha_config set primary_hearttime=now() where ha_groupid = %d" % (self.ha_groupid)
cur.execute("update ha_config set primary_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
else:
runsql = "update ha_config set standby_hearttime=now() where ha_groupid = %s" % (self.ha_groupid)
cur.execute("update ha_config set standby_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
cur.close()
conn.close()
except Exception,e:
errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
signal.signal(signal.SIGINT, myhandle)
signal.signal(signal.SIGTERM, myhandle)
try:
conn = connect_proxydb()
except Exception,e:
errlog("CONNECT ERROR : %s " % (traceback.format_exc()) )
g_running = True
time.sleep(1)
sys.exit()
cur = conn.cursor()
try:
runsql = "select ha_groupid,primary_ip from ha_config"
cur.execute(runsql)
res = cur.fetchone()
while res :
t = CMonitorHost(res[0], res[1], True, "db01", g_hauser, g_hapass)
t.start()
res = cur.fetchone()
runsql = "select ha_groupid,standby_ip from ha_config"
cur.execute(runsql)
res = cur.fetchone()
while res :
t = CMonitorHost(res[0], res[1], False, "db01", g_hauser, g_hapass)
t.start()
res = cur.fetchone()
except Exception,e:
errlog(e.pgcode+" : RUN SQL ERROR : "+runsql+" : "+ e.pgerror)
g_running = True
time.sleep(1)
sys.exit()
time.sleep(2)
# 檢查是否要切換
while g_running:
try:
runsql = "SELECT ha_groupid, extract(epoch from (now() - standby_hearttime)) shtime from ha_config where extract(epoch from (now() - primary_hearttime)) > %d and current_hostid=primary_hostid" % (g_switch_timedelay)
cur.execute(runsql)
res = cur.fetchone()
while res :
if res[1] < g_switch_timedelay:
errlog("ERROR: ha_groupid(%d) switch to standby database!" % (res[0]))
cur2 = conn.cursor()
runsql = "update ha_config set current_hostid=standby_hostid where ha_groupid=%s" % (res[0])
cur2.execute("update ha_config set current_hostid=standby_hostid where ha_groupid=%s", (res[0],))
cur2.close()
else:
errlog("ERROR: ha_groupid(%d) primary and standby all failed!!!" % (res[0]))
res = cur.fetchone()
runsql = "SELECT ha_groupid, extract(epoch from (now() - primary_hearttime)) shtime from ha_config where extract(epoch from (now() - standby_hearttime)) > %d and current_hostid=standby_hostid" % (g_switch_timedelay)
cur.execute(runsql)
res = cur.fetchone()
while res :
if res[1] < g_switch_timedelay:
errlog("ERROR: ha_groupid: %d switch to primary database!" % (res[0]))
cur2 = conn.cursor()
runsql = "update ha_config set current_hostid=primary_hostid where ha_groupid=%s" % (res[0])
cur2.execute("update ha_config set current_hostid=primary_hostid where ha_groupid=%s", (res[0],))
cur2.close()
else:
errlog("ERROR: ha_groupid: %d primary and standby all failed!!!" % (res[0]))
res = cur.fetchone()
except Exception,e:
errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
try:
cur.close()
conn.close()
except :
pass
conn = connect_proxydb()
cur = conn.cursor()
time.sleep(g_heartbeat_interval)
cur.close()
conn.close()
這本文之中,沒有講解HA組中主備庫之間的數(shù)據(jù)如何同步,實(shí)踐中主備庫之間的數(shù)據(jù)同步可以使用bucardo做數(shù)據(jù)雙向同步, 也可以使用slony來同步,但由于slony不支持雙master架構(gòu),備庫是不能寫的,所以使用slony時(shí),還需要修改我寫的這個(gè)plpha.py腳本,讓其可以把slony的備庫提升成主庫。當(dāng)然還有一些其它的方案。
另,雖然有兩臺plproxy,但當(dāng)一臺plproxy出現(xiàn)問題時(shí),應(yīng)用如何切換到另一臺plproxy,也需要考慮,在我們公司是使用F5負(fù)載均衡器來實(shí)現(xiàn)的。 |
|