小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

使用epoll實(shí)現(xiàn)客戶端UDP并發(fā)

 zhouADNjj 2014-04-29
網(wǎng)絡(luò)程序?yàn)榱酥С植l(fā),可以采用select,多線程等技術(shù).
但是對(duì)于select,readhat linux系統(tǒng)只支持最大1024個(gè)描述符.
因此要想同時(shí)并發(fā)超過(guò)1024,就無(wú)法使用select模式.
而使用多線程,并發(fā)數(shù)達(dá)到1000時(shí)將嚴(yán)重影響系統(tǒng)的性能.

而使用epoll可以避免以上的缺陷.
下面是一個(gè)使用epoll實(shí)現(xiàn)客戶端UDP并發(fā).是我為寫壓力測(cè)試程序而寫的.
發(fā)送使用一個(gè)獨(dú)立的線程,接收使用epoll調(diào)用.

在程序開(kāi)始要先設(shè)置系統(tǒng)能打開(kāi)的最大描述符限制.即setrlimt調(diào)用.

在linux readhat enterprise 4環(huán)境下測(cè)試通過(guò)。其它環(huán)境我沒(méi)測(cè)過(guò)。

g++ -o  udp_epoll_c udp_epoll_c.cpp -lpthread

/***************************************************************************
               file: udp_epoll_c.cpp
              -------------------
    begin : 2006/01/17
    copyright : (C) 2005 by 張薦林
    email : zhangjianlin_8 at 126.com
***************************************************************************/


/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/



#include <errno.h>
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string>
#include <sys/resource.h>
#include <pthread.h>
#include <vector>
using namespace std;
int Read(int fd,void *buffer,unsigned int length)
{
unsigned int nleft;
int nread;
char *ptr;
ptr = (char *)buffer;
nleft = length;
while(nleft > 0)
{
  if((nread = read(fd, ptr, nleft))< 0)
  {
   if(errno == EINTR)
    nread = 0;
   else
    return -1;
  }
  else if(nread == 0)
  {
   break;
  }
  nleft -= nread;
  ptr += nread;
}
return length - nleft;
}

int Write(int fd,const void *buffer,unsigned int length)
{
unsigned int nleft;
int nwritten;
const char *ptr;
ptr = (const char *)buffer;
nleft = length;
while(nleft > 0)
{
  if((nwritten = write(fd, ptr, nleft))<=0)
  {
   if(errno == EINTR)
    nwritten=0;
   else
    return -1;
  }
  nleft -= nwritten;
  ptr += nwritten;
}
return length;
}
int CreateThread(void *(*start_routine)(void *), void *arg = NULL, pthread_t *thread = NULL, pthread_attr_t *pAttr = NULL)
{
pthread_attr_t thr_attr;
if(pAttr == NULL)
{
  pAttr = &thr_attr;
  pthread_attr_init(pAttr);
  pthread_attr_setstacksize(pAttr, 1024 * 1024); // 1 M的堆棧

   pthread_attr_setdetachstate(pAttr, PTHREAD_CREATE_DETACHED);
}
pthread_t tid;
if(thread == NULL)
{
  thread = &tid;
}
int r = pthread_create(thread, pAttr, start_routine, arg);
pthread_attr_destroy(pAttr);
return r;
}

static int SetRLimit()
{
struct rlimit rlim;
rlim.rlim_cur = 20480;
rlim.rlim_max = 20480;
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
{
  perror("setrlimit");
}
else
{
  printf("setrlimit ok\n");
}
return 0;
}

int setnonblocking(int sock)
{
int opts;
opts=fcntl(sock,F_GETFL);
if(opts<0)
{
  return -1;
}
opts = opts|O_NONBLOCK;
if(fcntl(sock,F_SETFL,opts)<0)
{
  return -1;
}
return 0;
}

int ConnectToUdperver(const char *host, unsigned short port)
{
int sock = socket(AF_INET, SOCK_DGRAM, 0);
if(sock < 0)
{
  perror("socket");
        return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(host);

if(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
  perror("bind");
  close(sock);
     return -1;
}
return sock;
}

void *SendThread(void *arg)
{
vector<int> sockets;
sockets = *((vector<int> *)arg);

int n = 0;
char data[1024];
int i = 0;
while(1)
{
  for(vector<int>::iterator itr = sockets.begin(), last = sockets.end(); itr != last; ++itr)
  {
   sprintf(data, "test data %d\n", i++);
   n = Write(*itr, "hello", 5);
   printf("socket %d write to server[ret = %d]:%s",*itr, n, data);
  }
  sleep(1);
}
}

int main(int argc, char **argv)
{
SetRLimit();
printf("FD_SETSIZE= %d\n", FD_SETSIZE);
if (argc != 3)
{
  printf("usage: %s <IPaddress> <PORT>\n", argv[0]);
  return 1;
}

int epfd = epoll_create(20480);
if(epfd < 0)
{
  perror("epoll_create");
  return 1;
}
struct epoll_event event;
struct epoll_event ev[20480];
vector<int> sockets;
for(int i = 0; i < 3000; i++)
{
  int sockfd = ConnectToUdperver(argv[1], (unsigned short)(atoi(argv[2])));
  if(sockfd < 0)
  {
   printf("Cannot connect udp server %s %s\n", argv[1], argv[2]);
   return 1;
  }
  
  sockets.push_back(sockfd);
  setnonblocking(sockfd);
  event.data.fd = sockfd;
     event.events = EPOLLIN|EPOLLET;
     if(0 != epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event))
  {
   perror("epoll_ctl");
  }
}
if(0 != CreateThread(SendThread, (void *)&sockets))
{
  perror("CreateThread");
  return 2;
}
int nfds = 0;
while(1)
{
        nfds=epoll_wait(epfd,ev,20480,500);
  if(nfds < 0)
  {
   perror("epoll_wait");
   break;
  }
  else if(nfds == 0)
  {
   printf("epoll_wait timeout!\n");
   continue;
  }
  for(int i = 0; i < nfds; i++)
  {
   if(ev[i].events & EPOLLIN)
   {
    printf("can read for %d now\n", ev[i].data.fd);
    char data[1024] = {0};
    int n = read(ev[i].data.fd, data, sizeof(data));
    printf("Received %d bytes from server!\n", n);
   }
  }
}
for(vector<int>::iterator itr = sockets.begin(), last = sockets.end(); itr != last; itr++)
{
  close(*itr);
}
close(epfd);
return 0;
}

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多