|
3. 本地化隊列的實現(xiàn)思路 要給線程指定一個本地化隊列,通常的做法是先將創(chuàng)建好的隊列放入一個數(shù)組中,然后給線程編號,從0開始進(jìn)行編號,編號為0的線程對應(yīng)于數(shù)組下標(biāo)為0位置上存放的隊列,編號為1的線程對應(yīng)于數(shù)組下標(biāo)為1位置上存放的隊列,...。 每個線程要獲取自己的本地化隊列時,只需要先獲取線程編號,然后就可以通過線程編號去訪問對應(yīng)的隊列,由于每個線程的編號都不相同,因此每個線程訪問的隊列都不相同,即每個隊列只有一個線程訪問它,這樣就可以實現(xiàn)每個線程的本地化隊列。 那么如何給線程編號從0開始編號呢,操作系統(tǒng)并沒有直接提供這種功能。即使操作系統(tǒng)提供了線程從0開始編號的功能也沒有用,因為并不一定所有的線程都會訪問分布式隊列。例如有8個線程,其中編號為0,3,5,7的線程會訪問分布式隊列,那么在創(chuàng)建分布式隊列時,就需要創(chuàng)建8個本地隊列,否則線程編號將無法和存放隊列的數(shù)組下標(biāo)對應(yīng)起來。 看到這里,目標(biāo)已經(jīng)很明確了,那就是要給所有訪問分布式隊列的線程從0開始依次編號。比如有N個線程要訪問分布式隊列,那么需要給這N個線程依次編號為0,1,...N-1。下面就來討論如何給線程編號的問題。 4. 給線程編號的方法 在操作系統(tǒng)中,通常提供了線程本地存儲的API,通過API可以給每個線程設(shè)定一個數(shù)據(jù)(可以是指針,也可以是一個整數(shù)),同時也可以通過API來取出當(dāng)前線程設(shè)置的那個數(shù)據(jù)。比如給一個線程A設(shè)定一個整數(shù)0,那么線程A執(zhí)行的任何地方都可以調(diào)用相應(yīng)的API獲取到整數(shù)0,這樣就可以在程序的任何地獲取到線程A的編號為0。 在Windows系列操作系統(tǒng)中,提供了Tls_Alloc(),Tls_SetValue(),Tls_GetValue(),Tls_Free()這幾個函數(shù)來實現(xiàn)線程本地存儲操作。 pthread中,可以通過pthread_key_create(), pthread_setspecific(), pthread_getspecific()等函數(shù)來實現(xiàn)線程本地存儲操作,其中pthread_create_key()和Tls_Alloc()功能相同,只是參數(shù)有所不同,Tls_SetValue()和pthread_setspecific()功能等價,Tls_GetValue()和pthread_getspecific()功能等價。 下面演示一下TlsAlloc(),Tls_SetValue(),Tls_GetValue(),Tls_Free()這幾個函數(shù)的基本用法。 DWORD g_dwTlsIndex; LONG volatile g_dwThreadId = 0;
int GetId() { //獲取當(dāng)前執(zhí)行線程的由TlsSetValue()設(shè)置的值 int nId = (int)TlsGetValue(g_dwTlsIndex); return (nId-1); }
void ThreadFunc(void *args) { LONG Id = AtomicIncrement (&g_dwThreadId); //對g_dwThreadId進(jìn)行原子加1操作 TlsSetValue(g_dwTlsIndex, (void *)Id); //給當(dāng)前執(zhí)行的線程設(shè)置一個值
printf("ThreadFunc2: Thread Id = %ld\n", GetId()); }
int main(int argc, char* argv[]) { g_dwTlsIndex = TlsAlloc(); //分配一個線程本地存儲索引,需要在創(chuàng)建線程前執(zhí)行
_beginthread(ThreadFunc, 0, NULL); _beginthread(ThreadFunc, 0, NULL);
Sleep(100); //延時等待上面兩個線程執(zhí)行完 TlsFree(g_dwTlsIndex); return 0; } 需要說明一下,在ThreadFunc()函數(shù)中,使用了一個AtomicIncrement()函數(shù),這個函數(shù)相當(dāng)于Windows操作系統(tǒng)中的InterlockedIncrement()函數(shù)。在Widnows系統(tǒng)中,可以使用以下宏定義來實現(xiàn)AtomicIncrement()函數(shù): #define AtomicIncrement(x) InterlockedIncrement(x) 上面程序在運行后,會打印出以下結(jié)果: ThreadFunc: Thread Id = 0 ThreadFunc: Thread Id = 1
從上面代碼和執(zhí)行結(jié)果可以看出,雖然GetValue()在ThreadFunc()函數(shù)中執(zhí)行,但是兩個線程執(zhí)行GetValue()得到的值是不同的,一個線程得到的是0,另外一個線程得到的是1。這主要是因為兩個線程調(diào)用TlsSetValue()設(shè)置的值并不相同,一個為1,另一個為2。 需要注意的是,TlsGetValue()的返回值為0表示失敗,所以使用TlsSetValue()函數(shù)時,應(yīng)該從1開始設(shè)置,然后在GetId()函數(shù)中,返回的是TlsGetValue()的返回值減1。 采用上面的方法,就可以設(shè)計出分布式隊列中的線程Id自動編號和獲取功能了。下面是詳細(xì)的實現(xiàn)代碼: class CDistributedQueue { private: DWORD m_dwTlsIndex; LONG volatile m_lThreadIdIndex; public: CDistributedQueue(); virtual ~CDistributedQueue(); LONG ThreadIdGet(); //可以添加其他成員函數(shù)在下面 };
CDistributedQueue::CDistributedQueue() { m_dwTlsIndex = TlsAlloc(); m_lThreadIdIndex = 0; }
CDistributedQueue::~CDistributedQueue() { TlsFree(m_dwTlsIndex); }
LONG CDistributedQueue::ThreadIdGet() { LONG Id = (LONG )TlsGetValue(m_dwTlsIndex); if ( Id == 0 ) { Id = AtomicIncrement(&m_lThreadIdIndex); TlsSetValue(Id); } return (Id - 1); } 上面的代碼中,設(shè)置或獲取線程編號都在ThreadIdGet()一個成員函數(shù)內(nèi)完成,先判斷獲取的Id是否為0,如果為0,表明線程還沒有被設(shè)置Id,因此將m_lThreadIdIndex原子加1,然后再設(shè)置給對應(yīng)的線程。每調(diào)用一次TlsSetValue()函數(shù),其設(shè)置的Id值依次加1,這樣就可以得到一個1,2,3,...序列。每個線程調(diào)用了TlsSetValue()函數(shù)后,下一個調(diào)用TlsGetValue()函數(shù)時,獲得的值一定大于0,因此每個線程最多只能執(zhí)行TlsSetValue()函數(shù)一次。 采用上面的方法來獲取線程編號,必須保證創(chuàng)建的本地隊列數(shù)量大于等于訪問隊列的線程數(shù)量,否則隊列數(shù)量不足,將會造成沒有足夠的本地隊列供線程使用,程序中可能會造成越界等不可預(yù)測的異常。常用的解決辦法是將本地隊列的數(shù)量擴(kuò)大一倍。 上面這種線程編號方法,非常方便,任何訪問分布式隊列的線程都可以被自動編號,調(diào)用分布式隊列的線程不需要為編號操心。 有了給線程自動編號的方法后,就可以實現(xiàn)分布式隊列的各個具體操作如進(jìn)隊、出隊等。當(dāng)然在實現(xiàn)具體的操作代碼前,有必要了解一下分布式隊列中是如何進(jìn)行進(jìn)隊和出隊操作的。 |
|
|