小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

我寫的多線程環(huán)形緩沖,沒有用鎖。大家看看有沒有錯誤和優(yōu)化的地方? - C/C - Ch...

 jijo 2008-08-29

要做分析網(wǎng)絡數(shù)據(jù)包的項目,我寫好的個多線程的環(huán)形緩沖,自己試了半天,好像沒錯誤,大家?guī)兔聪?。我沒有用鎖,鎖對性能影響好像很大。但我覺得還有可以優(yōu)化的地方。
背景要求:將捕獲到的包存入環(huán)形緩沖的一個單元,如果緩沖緩沖滿了(寫線程快讀線程一圈),就開一個是原來兩倍大小新環(huán)形緩沖。讀線程在處理完舊緩沖緩沖后,釋放舊緩沖空間。
兩個線程,生產(chǎn)者和消費者問題。
先說數(shù)據(jù)結構:
環(huán)形緩沖中有很多單元,每一個單元用于儲存一個包的數(shù)據(jù)。線程可以同時訪問環(huán)形緩沖這個大數(shù)據(jù)結構,但不能同時訪問某一個緩沖單元。所以我在每個單元里加了個標志,0表明可寫,1表明可讀。初始為0.這樣就不需要用到鎖或信號量。 當標志為0,寫線程可以訪問,寫完數(shù)據(jù)后,將標志改為1(都沒使用原子操作),并跳入下一個單元。讀線程在標志為1的時候,讀取其中的數(shù)據(jù),讀完后將標志改為0.

/*Cirbuf chain 寫線程可能很快 以至于不只有1個或2個環(huán)形緩沖,可能有多個*/
struct Cirbuf* packetCirbuf;

/*環(huán)形緩沖的一個單元*/
struct CirUnit
{
    int flag; /*0 表明可寫,1表明可讀*/
    char unit[UNITSIZE];
};

/*環(huán)形緩沖*/
struct Cirbuf
{
    struct Cirbuf * newBuf; /*point to new Cirbuf*/
    int readPoint; /*for reading thread*/
    int writePoint; /*for writing thread*/
    int bufSize;    /*buf char array's size*/
    struct CirUnit *buf;        /*point to unit array*/
};


讀寫線程
對于寫線程:
如果寫線程發(fā)現(xiàn)標志為0,這此緩沖單元可寫,將數(shù)據(jù)寫入。
如果寫線程發(fā)現(xiàn)當前標志為1,就表明已經(jīng)追上讀線程,于是要新開緩沖空間。并把新空間加入到環(huán)形緩沖區(qū)鏈表中。
對于讀線程:
如果發(fā)現(xiàn)標志位1,則單元可讀,讀出數(shù)據(jù),并把標志改為0.
如果讀線程發(fā)現(xiàn)當前標志為0,就表明 (1).追上了寫線程 (2)或舊緩沖區(qū)所有單元已經(jīng)被讀線程處理完,而此時寫線程已經(jīng)在處理新緩沖區(qū)。 對于這兩種情況的分辨,只要看下面Cirbuf結構的Cirbuf * newBuf 指針,如果指針為空,表明為第一種情況,否則為第二種。對于第二種情況,需要釋放舊的環(huán)形緩沖區(qū)空間,并將指針指向新的環(huán)形緩沖區(qū)。
讀線程在數(shù)據(jù)單元不可用時(讀線程追上寫線程),我最先寫代碼時是讓他continue while的循環(huán),后來改成了讓他delay 50ms,這里不知道怎么處理好。


extern "C" void* ReadThread(void *arg)
{
    while(1)
    {
        if(packetCirbuf->buf[packetCirbuf->readPoint].flag == 0)
        {
            if(packetCirbuf->newBuf != NULL)
            {
                /*表明readthread已經(jīng)處理完舊的緩沖區(qū)并且已經(jīng)有新的緩沖區(qū),這時應該釋放舊緩沖*/
                struct Cirbuf *temp = packetCirbuf;
                packetCirbuf = packetCirbuf->newBuf;
                FreeCirbuf(temp);
                continue;
            }
            /* delay*/
            pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;
            pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
            struct timespec ts;
            int rv;
            ts.tv_sec = 0;
            ts.tv_nsec = 50000; /* 500,00 nanoseconds = 50 ms */
            pthread_mutex_lock(&mymutex);
            rv = pthread_cond_timedwait(&mycond, &mymutex, &ts);
            pthread_mutex_unlock(&mymutex);

            continue;
        }
        DoRead();
        packetCirbuf->buf[packetCirbuf->readPoint].flag = 0;
        if(++packetCirbuf->readPoint == packetCirbuf->bufSize)
            packetCirbuf->readPoint = 0;
    }
    return NULL;
}

extern "C" void* WriteThread(void *arg)
{
    struct Cirbuf* latestBuf;
    /*進寫線程后,初始化latestBuf指針 使其指向packetCirbuf,這時的packetCirbuf是由主線程函數(shù)創(chuàng)建的初始buf*/
    latestBuf = packetCirbuf;
    if(latestBuf == NULL)
    {
        cout << "packet capture buffer NULL error" << endl;
        exit(0);
    }
    /*進入循環(huán)*/
    char test=0;
    while(1)
    {
        if (latestBuf->buf[latestBuf->writePoint].flag == 1)
        {
            /*we need a larger buf*/
            latestBuf = GetNewCirbuf(latestBuf->bufSize * 2,latestBuf);
        }
        else
        {
            DoWrite(latestBuf,test++);
            latestBuf->buf[latestBuf->writePoint].flag = 1;
            if(++latestBuf->writePoint == latestBuf->bufSize)
                latestBuf->writePoint = 0;
        }
    }
    return NULL;
}


不需要用鎖的原因是 數(shù)據(jù)競爭發(fā)生在flag內(nèi)存單元,
讀線程如果取得flag為 1,但線程調(diào)度或另一個CPU核上的線程將flag改為0,就會發(fā)生錯誤。但只有讀線程自己會把flag從1變?yōu)?,所以不會出錯 同理寫進程也是。

程序應該有優(yōu)化的地方 比如讀進程追上寫進程的時候,是阻塞50ms還是直接continue還是有其他方法之類。

我測試是寫進程往緩沖寫數(shù)字1 2 3 讀進程打印出來,打印的結果沒有錯序,也沒有缺數(shù)發(fā)生。

整個程序代碼:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <thread.h>
#include <time.h>

#define INITBUFSIZE        10 /*環(huán)型緩沖區(qū)初始單元個數(shù)*/
#define UNITSIZE        200/*每單元大小*/

using namespace std;
/*global data*/
/*Cirbuf chain 寫進程可能很快 以至于不只有1個或2個環(huán)形緩沖,可能有多個*/
struct Cirbuf* packetCirbuf;

/*環(huán)形緩沖的一個單元*/
struct CirUnit
{
        int flag; /*0 表明可寫,1表明可讀*/
        char unit[UNITSIZE];
};

/*環(huán)形緩沖*/
struct Cirbuf
{
        struct Cirbuf * newBuf; /*point to new Cirbuf*/
        int readPoint; /*for reading thread*/
        int writePoint; /*for writing thread*/
        int bufSize;        /*buf char array's size*/
        struct CirUnit *buf;                /*point to unit array*/
};

struct Cirbuf* GetNewCirbuf(int bufSize,struct Cirbuf* oldBuf)
{
        if(bufSize>30000)
        {
                printf("oh my god,the bufSize is out of my league,I cannot handle it,exit");
                exit(1);
        }
        struct Cirbuf* newBuf = new Cirbuf();
        if(oldBuf != NULL)
                oldBuf->newBuf = newBuf;
        newBuf->newBuf = NULL;
        newBuf->readPoint = newBuf->writePoint = 0;
        newBuf->bufSize = bufSize;
        newBuf->buf = new CirUnit[bufSize];
        memset(newBuf->buf,0,sizeof(CirUnit)*bufSize); /*初始化單元為0*/
        return newBuf;
}

int FreeCirbuf(struct Cirbuf* bufPoint)
{
        delete bufPoint->buf;
        delete bufPoint;
        return 1;
}

void DoWrite(struct Cirbuf* latestBuf,char flag)
{
        latestBuf->buf[latestBuf->writePoint].unit[0] = flag;
        //printf("%d ",flag);
}

void DoRead()
{
        //cout<< packetCirbuf->buf[packetCirbuf->readPoint].unit[0] << endl;
        //printf("%d ",packetCirbuf->buf[packetCirbuf->readPoint].unit[0]);
        //printf(".");
}

extern "C" void* ReadThread(void *arg)
{
        while(1)
        {
                if(packetCirbuf->buf[packetCirbuf->readPoint].flag == 0)
                {
                        if(packetCirbuf->newBuf != NULL)
                        {
                                /*表明readthread已經(jīng)處理完舊的緩沖區(qū)并且已經(jīng)有新的緩沖區(qū),這時應該釋放舊緩沖*/
                                struct Cirbuf *temp = packetCirbuf;
                                packetCirbuf = packetCirbuf->newBuf;
                                FreeCirbuf(temp);
                                continue;
                        }
                        /* delay*/
                        pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;
                        pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
                        struct timespec ts;
                        int rv;
                        ts.tv_sec = 0;
                        ts.tv_nsec = 50000; /* 500,00 nanoseconds = 50 ms */
                        pthread_mutex_lock(&mymutex);
                        rv = pthread_cond_timedwait(&mycond, &mymutex, &ts);
                        pthread_mutex_unlock(&mymutex);

                        continue;
                }
                DoRead();
                packetCirbuf->buf[packetCirbuf->readPoint].flag = 0;
                if(++packetCirbuf->readPoint == packetCirbuf->bufSize)
                        packetCirbuf->readPoint = 0;
        }
        return NULL;
}

extern "C" void* WriteThread(void *arg)
{
        struct Cirbuf* latestBuf;
        /*進寫進程后,初始化latestBuf指針 使其指向packetCirbuf,這時的packetCirbuf是由主線程函數(shù)創(chuàng)建的初始buf*/
        latestBuf = packetCirbuf;
        if(latestBuf == NULL)
        {
                cout << "packet capture buffer NULL error" << endl;
                exit(0);
        }
        /*進入循環(huán)*/
        char test=0;
        while(1)
        {
                if (latestBuf->buf[latestBuf->writePoint].flag == 1)
                {
                        /*we need a larger buf*/
                        latestBuf = GetNewCirbuf(latestBuf->bufSize * 2,latestBuf);
                }
                else
                {
                        DoWrite(latestBuf,test++);
                        latestBuf->buf[latestBuf->writePoint].flag = 1;
                        if(++latestBuf->writePoint == latestBuf->bufSize)
                                latestBuf->writePoint = 0;
                }
        }
        return NULL;
}

int main(int argc, char *argv[])
{
        pthread_t threadNum[2];
        packetCirbuf = GetNewCirbuf(INITBUFSIZE,NULL);
        thr_create(0,0,WriteThread,0,0,&threadNum[0]);
        thr_create(0,0,ReadThread,0,0,&threadNum[1]);
        thr_join(threadNum[0],NULL,NULL);
        thr_join(threadNum[1],NULL,NULL);
        FreeCirbuf(packetCirbuf);
}





    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多