秦飛



摘 要:在SylixOS下,介紹了基于傳統消息隊列和高效協程模式兩種生產者-消費者模型的編程方法,比較兩種模式的編程方法在互斥和同步機制上的區別及其帶來的消耗。
關鍵詞:SylixOS;同步和互斥;協程
中圖分類號:TP311.1? ? ? ? ? ? 文獻標識碼:A
引言:
傳統的生產者-消費者模型中,生產者和消費者之間的關系可以總結為如下幾點:
●生產者與生產者:互斥關系
●消費者與消費者:互斥關系
●生產者與消費者:互斥與同步關系
要處理好這些關系,就要用到系統中的互斥與同步機制,并涉及到線程的阻塞和喚醒操作,而這些機制往往會帶來一些必要的消耗,影響運行效率。
協程,又稱作協同程序,是比線程還小的可執行代碼序。在SylixOS中,一個線程可以擁有多個協程,這些協程共享了線程除了棧以外的所有資源,包括優先級、內核對象等。線程內部的協程不可被搶占,只能輪轉運行。所以,利用這個特點可以構建出一個基于協程的高效的生產者-消費者模型。
1、何為生產者消費者模型
經典生產者-消費者模型描述如下:兩個線程共享一個消息隊列,其中一個線程,即生產者,向該消息隊列中放入信息;另外一個線程,即消費者,從該消息隊列中取走信息,這里就涉及到了多任務操作系統中的同步和互斥機制。
在SylixOS中,同步即一個任務的執行結果是另一個任務能夠執行的前提條件,互斥即多任務同時使用到臨界資源時可能會引發的競爭。在生產者-消費者模型中,消費者任務需要等待生產任務產生數據后才能執行,即同步;而生產任務和消費任務要串行地訪問消息隊列,即互斥。
2、生產者消費者模型的兩種編程方法
2.1 傳統消息隊列方式
該方式中,生產者-消費者模型中的公共緩沖區即對應著消息隊列。編程步驟如下:
① 創建生產者線程;
② 創建消費者線程;
③ 創建消息隊列,獲取操作句柄;
④ 生產者線程中循環調用消息發送函數,向消息隊列中發送數據;
⑤ 消費者線程中循環調用消息接收函數,從消息隊列中獲取數據,若消息隊列中無數據,則會阻塞。
2.2 高效率協程模式
SylixOS中的協程需要依賴所屬線程被調度時執行,所以需要先創建一個初始線程,再去調用相應的函數創建生產者和消費者協程,需要注意的是,每一個線程創建時都會默認創建一個初始協程,并且線程總是從該初始協程開始運行,所以初始線程中需要主動放棄處理器,以保證生產者和消費者協程能運行。編程步驟如下:
① 創建初始線程;
② 創建生產者協程;
③ 創建消費者協程;
④ 初始線程放棄處理器調用;
⑤ 生產者協程向公共緩沖區填充消息后,主動放棄處理器;
⑥ 消費者協程從公共緩沖區獲取消息后,主動放棄處理器;
⑦ 初始線程恢復生產者和消費者協程,跳轉至步驟⑤。
2.3 兩種編程模式的比較
通過上述兩種模式的步驟比較,可以得出以下結論:
① 傳統消息隊列模式中,需要創建多線程并進行線程間的切換,同時生產者和消費者線程間的通信采用消息隊列的形式,其發送和接收函數內部實現涉及到進入和退出內核態的模式切換,以保證生產者和消費者線程中的同步和互斥機制,這里面會有較大的時間消耗;
② 高效協程模式中,只有一個初始線程,內部的協程并不會被調度器調度,所以不會有線程切換的消耗。另外,初始線程內部的協程是以輪轉的方式運行,由用戶程序自行管理,所以也就不會有線程間通信帶來的消耗。
綜上,可以看出,協程模式比傳統消息隊列模式更高效。
3、模型編程流程設計
基于兩種方法設計應用流程圖及其簡單說明
3.1 消息隊列方法
3.2 協程方法
4、代碼設計
4.1 消息隊列方法
基于消息隊列的設計代碼中,生產者線程獲取當前時間t1并填入消息隊列中,消費者線程從消息隊列中獲取消息后再次獲取當前時間t2,將t1和t2的差值作為該方法帶來的時間消耗。示例代碼如下。
#include
#include
#include
#include
#define MAX_MSG_COUNT? ?100
#define MAX_MSG_BYTES? ?sizeof(struct timeval)
LW_HANDLE? ?_G_hMsgQ = 0;
static PVOID __threadConsume (PVOID? pvArg)
{
struct timeval? end;
struct timeval? recv;
INT? ? iError? = PX_ERROR;
double t? ? ? ?= 0;
size_t stLen? ?= 0;
INT? ? iMsgCnt = 0;
while (1) {
lib_memset(&end, 0, sizeof(struct timeval));
lib_memset(&recv, 0, sizeof(struct timeval));
iError = Lw_MsgQueue_Receive(_G_hMsgQ,
(PVOID)&recv,
sizeof(struct timeval),
&stLen,
LW_OPTION_WAIT_INFINITE);
if (ERROR_NONE != iError) {
printf("msg recv error.\n");
break;
}
lib_gettimeofday(&end, LW_NULL);
t += end.tv_sec * 1e6 + end.tv_usec - (recv.tv_sec * 1e6 + recv.tv_usec);
if (++iMsgCnt == 100) {
printf("legancy msgQ: consumer get msg cost: %.02f us\n", t / iMsgCnt);
iMsgCnt = 0;
t? ? ? ?= 0;
}
}
return? (LW_NULL);
}
static PVOID __threadProduct (PVOID? pvArg)
{
struct timeval? start;
INT? ? ? ? ? ? ?iError;
while (1) {
lib_memset(&start, 0, sizeof(struct timeval));
lib_gettimeofday(&start, LW_NULL);
iError = Lw_MsgQueue_Send(_G_hMsgQ, (PVOID)&start, sizeof(struct timeval));
if (ERROR_NONE != iError) {
printf("msg send error.\n");
break;
}
Lw_Time_Sleep(1);
}
return? (LW_NULL);
}
int main (int argc, char **argv)
{
LW_HANDLE? hProduct;
LW_HANDLE? hConsume;
_G_hMsgQ = Lw_MsgQueue_Create("msg_cp",
MAX_MSG_COUNT,
MAX_MSG_BYTES,
LW_OPTION_WAIT_FIFO | LW_OPTION_OBJECT_LOCAL,
LW_NULL);
hProduct = Lw_Thread_Create("t_product", __threadProduct, LW_NULL, LW_NULL);
if (LW_OBJECT_HANDLE_INVALID == hProduct) {
printf("t_product create failed.\n");
return? (-1);
}
hConsume = Lw_Thread_Create("t_consume", __threadConsume, LW_NULL, LW_NULL);
if (LW_OBJECT_HANDLE_INVALID == hConsume) {
printf("t_consume create failed.\n");
return? (-1);
}
Lw_Thread_Join(hProduct, LW_NULL);
Lw_Thread_Join(hConsume, LW_NULL);
Lw_MsgQueue_Delete(&_G_hMsgQ);
return? (0);
}
4.2 協程方法
基于協程的設計代碼中,生產者協程將數據放入公共緩沖區,并獲取當前時間t1,然后放棄CPU,接著消費者協程中獲取時間t2,并從公共緩沖區獲取數據,將t1和t2的差值作為生產者和消費者協程切換的時間消耗。示例代碼如下。
#include
#include
static PVOID? pcCrcb0 = LW_NULL;
static PVOID? pcCrcb1 = LW_NULL;
static double t? ? ? ?= 0;
static INT? ? iCnt? ? = 0;
static struct timeval? start;
static struct timeval? end;
VOID? coroutine0 (PVOID? pvArg)
{
while (1) {
lib_memset(&start, 0, sizeof(struct timeval));
lib_gettimeofday(&start, LW_NULL);
Lw_Coroutine_Yield();
}
}
VOID? coroutine1 (PVOID? pvArg)
{
while (1) {
lib_memset(&end, 0, sizeof(struct timeval));
lib_gettimeofday(&end, LW_NULL);
t += end.tv_sec * 1e6 + end.tv_usec - (start.tv_sec * 1e6 + start.tv_usec);
if (++iCnt == 100) {
printf("coroutine: consumer get msg cost: %.02f us\n", t / iCnt);
iCnt = 0;
t? ? = 0;
}
Lw_Coroutine_Yield();
}
}
PVOID? tTest (PVOID? pvArg)
{
pcCrcb0 = Lw_Coroutine_Create(coroutine0, 2 * 1024, LW_NULL);
if (pcCrcb0 == LW_NULL) {
return? (LW_NULL);
}
pcCrcb1 = Lw_Coroutine_Create(coroutine1, 2 * 1024, LW_NULL);
if (pcCrcb1 == LW_NULL) {
return? (LW_NULL);
}
Lw_Coroutine_Yield();? ? ? ? ? ? ? ? ? ? /*? 使其他協程運行? ? ? ? ? ? ?*/
while (1) {
Lw_Coroutine_Resume(pcCrcb0);
Lw_Time_Sleep(1);
}
return? ((PVOID)1);
}
int main (int argc, char *argv[])
{
LW_HANDLE? hId;
hId = Lw_Thread_Create("t_test", tTest, LW_NULL, LW_NULL);
if (hId == LW_HANDLE_INVALID) {
return? (PX_ERROR);
}
Lw_Thread_Join(hId, LW_NULL);
return? (ERROR_NONE);
}
5、測試驗證
5.1 環境搭建
硬件環境:
● CPU:Intel(R) Core(TM) i3-6100 CPU @ 3.70GHz
● 內存:4GB
● 硬盤:500GB
軟件環境
● SylixOS內核版本:1.9.9-9
● SylixOS BSP版本:1.1.3
5.2 測試輸出
① 傳統消息隊列模式
② 高效協程模式
5.3 測試分析
根據代碼運行結果,可以看出生產者-消費者模型中,采用協程模式比傳統消息隊列模式時間消耗要小得多。
6、結語
在嵌入式領域的資源受限,生產消費模型簡單情況下,使用高效協程編程可以有效降低傳統消息隊列模式下同步和互斥機制、線程調度等帶來的時間消耗,提高生產者和消費者的運行效率。SylixOS內核原生支持協程,而不是使用第三方庫模擬,使得SylixOS內部的協程管理更加便捷高效,這樣使用SylixOS協程構建的生產者-消費者模型也就更加高效。
參考文獻:
[1] 陳莉君. Linux內核編程[M]. 北京:機械工業出版社,2004
[2] 劉俞. Linux多線程的互斥與同步控制及實踐. 電腦知識與技術[J]. 2005,(18)
[3] 曹聰,范廉明. 操作系統原理與分析. 科學出版社[M]. 2003,09