歡迎您光臨本站 註冊首頁

Heartbeat 通信層結構分析

←手機掃碼閱讀     火星人 @ 2014-03-12 , reply:0
  前言
  Heartbeat是Linux-HA開源項目發布的用於關鍵應用環境的HA軟體名稱。從1999以來到現在, 歷經1.2.x, 2.0.x等多個版本,在全球開源HA領域具有舉足輕重的知名度, 應用日益廣泛, 並且得到了一些主流Linux操作系統廠商的支持。
而通信層實現無疑是集群軟體運行的最基本底層支撐。本文對通過分析Heartbeat源碼,對其通信層基本結構和機制進行了分析和闡述。給出了基本數據結構和實現流程。
  所有分析基於Heartbeat 2.0.4版本。
相關源碼: http://www.linux-ha.org/download/heartbeat-2.0.4.tar.gz


Heartbeat通信結構概述
  主要分2種:
  1.HBcomm 通信層PLUGIN (節點之間的進程通信)
實現主要是在各個媒介的Plugin里,通過PILS動態連接庫載入。比如支持多播,單播,串口等通信方式。所有節點間通信PLUGIN模塊放在lib/plugins/hbcomm/路徑下。
  2.Unix Domain Socket (節點內的進程通信)
  /include/clplumbing/Ipc.h, IPC抽象層數據結構定義
  /lib/clplumbing/ocf_ipc.c, IPC底層抽象實現
  /lib/clplumbing/ipcsocket.c, IPC的unix域套接字具體實現

交叉點:
  節點間和節點內2種通信方式的交接點在heartbeat.c的read_child(), write_child()等函數中, 在這裡實現消息的轉發。
Heartbeat API
  是基於ipc抽象層的Unix域實現基礎上,用於滿足heartbeat和client子模塊之間的應用層通信需求。:
  client_lib.c實現了Heartbeat API 客戶端部分。
  hb_api.c實現了heartbeat API伺服器端部分。




  圖1:Heartbeat通信結構概圖
上圖描述了一個client子模塊把消息通過Heartbeat通信機制發送到另一個節點相同模塊的過程。
  1. client子模塊通過FIFO管道把消息發送到FIFO子進程fifo_child。為什麼使用FIFO來進行通信呢,應該是有些進程不能很方便的和 Heartbeat主進程建立Unix域IPC通道的關係,比如執行的腳本和集群管理程序, 集群狀態查詢程序。
  2. FIFO子進程通過msgfromstream()從fifo管道收到消息后,利用事先建立好的和Heartbeat之間的IPC通道轉發給Heartbeat主進程
  3. 主進程判斷消息是發給自己的則調用process_msg()進行處理,否則調用send_to_all_media()通過各個媒介的wchan通道發送給write_child子進程。
  4. write_child子進程通過ipcmsgfromIPC()從主進程收到消息,調用各個媒介結構hb_media的write函數把消息發送到集群其他節點。
  5. 其他節點的read_child子進程通過各個媒介結構hb_media的read函數讀到消息后,使用事先和Heartbeat主進程建立的IPC通道發送消息到Heartbeat主進程
  6. Heartbeat主進程通過msgfromIPC()收到消息后,調用process_clustermsg()函數進行處理。具體為,如果是主進程處 理的消息調用HBDoMsgCallback進行處理,否則通過newstartha_monitor發送到各個client子進程


節點間通信Plugin
  代碼在lib/plugins/hbcomm/目錄中
  bcast.c /* 廣播 */
  mcast.c /* 多播 */
  ucast.c /* 單播 */
  openais.c /* openais */
  serial.c /* 串口 */
  ping.c /* icmp */
  ping_group.c /* ping一組主機 */
  hbaping.c /* 光纖匯流排適配器ping */

/* 這個結構的每個函數對應Plugin里的具體函數。*/
struct hb_media_fns {
struct hb_media*(*new) (const char * token); /* 建立媒介 */
int (*parse) (const char * options); /* 讀取配置文件參數 */
int (*open) (struct hb_media *mp); /* 打開 */
int (*close) (struct hb_media *mp); /* 關閉 */
void* (*read) (struct hb_media *mp, int *len ); /* 讀 */
int (*write) (struct hb_media *mp , void *msg, int len); /* 寫 */
int (*mtype) (char **buffer); /* 獲取媒介類型 */
int (*descr) (char **buffer); /* 獲取媒介描述 */
int (*isping) (void); /* 是否ping類型媒介 */
};

hb_media_fns各功能函數調用之處:
new(): config.c的add_option函數
parse(): config.c的parse_config函數
open(): heartbeat.c的initialize_heartbeat函數
close(): heartbeat.c的initialize_heartbeat函數
read(): heartbeat.c的read_child函數
write(): heartbeat.c的write_child函數
mtype(): config.c的parse_config函數
descr(): config.c的parse_config函數
isping(): 在config.c和hb_api.c被調用



節點內IPC通信
IPC通信抽象層(include\clplumbing\ipc.h)
IPC抽象層數據結構概述:
(註:縮進的為該數據結構所屬元素)
IPC_AUTH /* 安全認證數據結構 */
IPC_WAIT_CONNECTION /* 等待連接數據結構 */
IPC_WAIT_OPS /* 等待連接函數集 */
IPC_CHANNEL /* 通信管道數據結構 */
IPC_OPS /* 通信管道函數集 */
IPC_QUEUE /* 信息隊列 */
ipc_bufpool /* 接收緩衝池,經處理轉化為接收隊列 */
IPC_MESSAGE /* IPC通信信息數據結構 */
IPC_CHANNEL /* 信息所屬通信管道 */
SOCKET_MSG_HEAD /* 信息頭數據結構 */


其中2種主要抽象數據結構:
/* server端等待客戶端的連接 */
struct IPC_WAIT_CONNECTION{
int ch_status; /* wait conn. status.*/
void * ch_private; /* wait conn. private data. */
IPC_WaitOps *ops; /* wait conn. function table .*/
};

/* 活動的通信管道結構 */
struct IPC_CHANNEL{
int ch_status; /* 通道狀態 */
pid_t farside_pid; /* 遠端 pid */
void* ch_private; /* channel private data. (may contain conn. info.) */
IPC_Ops* ops; /* 通道函數集 */
unsigned int msgpad; /* 信息前綴位元組數 */
unsigned int bytes_remaining; /* 剩餘未發送的位元組數 */
gboolean should_send_block; /* */

/* private: */
IPC_Queue* send_queue; /* 發送緩衝 */
IPC_Queue* recv_queue; /* 接收緩衝 */

/* 接收緩衝池, 經處理後轉化為接收信息隊列recv_queue */
struct ipc_bufpool* pool; /* buffer pool */

/* 發送的流量控制 */
int high_flow_mark;
int low_flow_mark;
void* high_flow_userdata;
void* low_flow_userdata;
flow_callback_t high_flow_callback;
flow_callback_t low_flow_callback;

int conntype;
char failreason[MAXFAILREASON];
};


IPC抽象層通信
server端:
1. 調用ipc_wait_conn_constructor()建立等待連接管道,成功則返回IPC_WaitConnection.
2. 通過poll/select來輪詢客戶請求。使用accept_connection接受連接,返回IPC_Channel。

client端:
調用ipc_channel_constructor()連接server, 返回IPC_Channel。


IPC抽象層的UNIX Domain Socket實現

static struct IPC_OPS socket_ops = {
destroy: socket_destroy_channel, /* 刪除通信管道 */
initiate_connection: socket_initiate_connection, /* 從client端建立連接 */
verify_auth: socket_verify_auth, /* 客戶端認證信息 */
assert_auth: socket_assert_auth, /* 斷言認證, (未用)*/
send: socket_send, /* 向管道發送信息 */
recv: socket_recv, /* 從管道接收信息*/
waitin: socket_waitin, /* 等待輸入信息, (然後讀取) */
waitout: socket_waitout, /* 等待信息輸出結束 */
is_message_pending: socket_is_message_pending, /* 有信息可讀或掛斷 */
is_sending_blocked: socket_is_output_pending, /* 輸出是否阻塞 */
resume_io: socket_resume_io, /* 恢復所有可能的ipc操作 */
get_send_select_fd: socket_get_send_fd, /* 取得發送fd */
get_recv_select_fd: socket_get_recv_fd, /* 取得接收fd */
set_send_qlen: socket_set_send_qlen, /* 設置最大發送緩衝長度 */
set_recv_qlen: socket_set_recv_qlen, /* 設置最大接收緩衝長度*/
set_high_flow_callback: socket_set_high_flow_callback, /* 高流量callback函數 */
set_low_flow_callback: socket_set_low_flow_callback, /* 低流量callback函數 */
new_ipcmsg: socket_new_ipcmsg, /* 返回一個新建立的IPC信息 */
get_chan_status: socket_get_chan_status, /* 返回管道狀態 */
is_sendq_full: socket_is_sendq_full, /* 發送緩衝是否已滿 */
is_recvq_full: socket_is_recvq_full, /* 接收緩衝是否已滿 */
get_conntype: socket_get_conntype, /* 返回管道類型 */
/* 可以是IPC_SERVER , IPC_CLIENT , IPC_PEER */
};


節點間通信Plugin / 節點內通信交叉點
主要實現代碼在heartbeart.c中

Heartbeat通信媒介結構
struct hb_media {
void * pd; /* 自定義數據結構 */
const char * name; /* 媒介名 */
char* type; /* 媒介類型 */
char* description; /* 媒介描述 */
const struct hb_media_fns*vf; /* hbcomm媒介處理函數集 */
IPC_Channel* wchan[2]; /* Unix域寫子進程通信管道 */
IPC_Channel* rchan[2]; /* Unix域讀子進程通信管道 */
};


/* heartbeat發送信息集群 */
/* 1.發送消息到write_child子進程 */
send_cluster_msg{ /* 發送信息到集群 */

process_outbound_packet{ /* 帶包重傳控制 */
send_to_all_media{ /* 發送到所有媒介 */
for (j=0; j < nummedia; ++j) {
IPC_Channel* wch = sysmedia[j]->wchan[P_WRITEFD];

/* 發送到特定傳送媒介的寫子進程 */
wrc=wch->ops->send(wch, outmsg);
}
}
}
}

/* 2. write_child寫子進程發送消息到集群 */
write_child(){
IPC_Channel* ourchan = mp->wchan[P_READFD];
for(;;){
/* write_child通過Unix Domain Socket 接收heartbeat信息 */
IPC_Message* ipcmsg = ipcmsgfromIPC(ourchan); /* 調用ops->recv() */

/* 發送到集群其他節點 */
if (mp->vf->write(mp, ipcmsg->msg_body, ipcmsg->msg_len) != HA_OK) {
……
}
}
}


/* 從集群接收信息 */
/* 1. read_child讀子進程從集群接收消息 */
Read_child(){
IPC_Channel* ourchan = mp->rchan[P_READFD];
For(;;){
/* 從hbcomm PLUGIN接收 */
if ((pkt=mp->vf->read(mp, &pktlen)) == NULL) {
……
}
if (NULL != imsg){
/* read_child子進程通過UNIX Domain Socket, 發送到heartbeat */
rc = ourchan->ops->send(ourchan, imsg);
rc2 = ourchan->ops->waitout(ourchan);

}
}
}

/* 2. heartbeat從read_child子進程接收信息並進行處理 */
s = G_main_add_IPC_Channel(PRI_READPKT
, sysmedia[j]->rchan[P_WRITEFD], FALSE
, read_child_dispatch, sysmedia+j, NULL);
read_child_dispatch(){

msg = msgfromIPC(source, MSG_NEEDAUTH); /*調用ops->recv()從read_child讀 */
process_clustermsg(msg, lnk); /* 對讀到信息進行處理 */
}



heartbeat API Server端
struct api_query_handler query_handler_list[] = {
{API_SIGNOFF, api_signoff}, /* client登陸 */
{API_SETFILTER, api_setfilter}, /* 設置消息過濾 */
{API_SETSIGNAL, api_setsignal}, /* 設置消息到達信號通知 */
{API_NODELIST, api_nodelist}, /* 獲取節點列表 */
{API_NODESTATUS, api_nodestatus}, /* 查詢節點狀態 */
{API_NODETYPE, api_nodetype}, /* 查詢節點類型 */
{API_IFSTATUS, api_ifstatus}, /* 查詢心跳狀態 */
{API_IFLIST, api_iflist}, /* 查詢心跳列表 */
{API_CLIENTSTATUS, api_clientstatus}, /* 查詢client模塊狀態 */
{API_NUMNODES, api_num_nodes}, /* 返回集群普通節點數 */
{API_GETPARM, api_get_parameter}, /* 返回特定參數值 */
{API_GETRESOURCES, api_get_resources}, /* 返回資源狀態(兼容1.2.x以前版本) */
{API_GETUUID, api_get_uuid}, /* 取得節點uuid值 */
{API_GETNAME, api_get_nodename}, /* 取得節點名 */
{API_SET_SENDQLEN, api_set_sendqlen} /* 設置發送隊列長度 */
};


heartbeat API client端
static struct llc_ops heartbeat_ops = {
signon: hb_api_signon, /* 註冊新的heartbeat client */
signoff: hb_api_signoff, /* 註銷一個heartbeat client */
delete: hb_api_delete, /* 註銷結構 */
set_msg_callback: set_msg_callback, /* 設置某信息類型callback */
set_nstatus_callback: set_nstatus_callback, /* 設置節點狀態類型callback */
set_ifstatus_callback: set_ifstatus_callback, /* 設置心跳狀態類型callback */
set_cstatus_callback: set_cstatus_callback, /* 設置client狀態類型callback */
init_nodewalk: init_nodewalk, /* 初始化節點遍歷 */
nextnode: nextnode, /* 下一個節點 */
end_nodewalk: end_nodewalk, /* 結束節點遍歷 */
node_status: get_nodestatus, /* 節點當前狀態 */
node_type: get_nodetype, /* 節點類型 */
init_ifwalk: init_ifwalk, /* 初始化心跳遍歷 */
nextif: nextif, /* 下一個心跳介面 */
end_ifwalk: end_ifwalk, /* 結束心跳遍歷 */
if_status: get_ifstatus, /* 心跳當前狀態 */
client_status: get_clientstatus, /* client當前狀態 */
get_uuid_by_name: get_uuid_by_name, /* 根據名字取得uuid */
get_name_by_uuid: get_name_by_uuid, /* 根據uuid取得名字 */
sendclustermsg: sendclustermsg, /* 發送消息到cluster中所有成員*/
sendnodemsg: sendnodemsg, /* 發送消息到特定節點 */
sendnodemsg_byuuid: sendnodemsg_byuuid, /* 發送消息到特定節點(by uuid)*/
send_ordered_clustermsg:send_ordered_clustermsg, /* 發送順序集群信息 */
send_ordered_nodemsg: send_ordered_nodemsg, /* 發送順序節點信息 */
inputfd: get_inputfd, /* 返回和檢測信息到達*/
ipcchan: get_ipcchan, /* 返回IPC_Channel 類型ipc通道 */
msgready: msgready, /* 當有信息可讀時返回true*/
setmsgsignal: hb_api_setsignal, /* setmsgsignal */
rcvmsg: rcvmsg, /* 接收msg, 交給callback處理 */
readmsg: read_msg_w_callbacks, /* 返回沒有註冊callback的msg */
setfmode: setfmode, /* setfmode */
get_parameter: get_parameter,
get_deadtime: get_deadtime,
get_keepalive: get_keepalive,
get_mynodeid: get_mynodeid, /* 取得本地節點名 */
get_logfacility: get_logfacility, /* suggested logging facility */
get_resources: get_resources, /* 取得資源當前分佈狀態 */
chan_is_connected: chan_is_connected,
set_sendq_len: set_sendq_len, /* 設置發送緩存區長度 */
set_send_block_mode: socket_set_send_block_mode,
errmsg: APIError,
};
註:
Client端API函數集明顯比Server端查詢處理函數集要多,是因為有些功能不需要通過Server端查詢來得到。


[火星人 ] Heartbeat 通信層結構分析已經有786次圍觀

http://coctec.com/docs/program/show-post-71820.html