マルチスレッドでのechoサーバ
マルチスレッドでのechoサーバのメモ
server.cc
#include <iostream> #include <sstream> #include <pthread.h> #include "ServerSocket.h" #include "Queue.h" using namespace std; string readLine(int sock){ char c; stringstream buffer; while(1){ int ret = read(sock, &c , 1); if(ret == -1){ perror("read"); break; }else if(ret == 0){ break; } buffer << c; if(c == '\n'){ break; } } return buffer.str(); } void * handleRequest(void * arg){ Queue<int> * Q = static_cast< Queue<int> *> (arg); while(1){ int conn = Q->Pop(); while(1){ string line = readLine(conn); if(line.size() == 0)break; //注:1度のwriteで必ずしもソケットにすべて書けるわけではない write(conn , line.c_str() , line.size()); } close(conn); } return NULL; } int main(){ int numThread = 10; ServerSocket sock(8080 , 128); int r = sock.Listen(); cout << r << endl; Queue<int> Q(numThread); for(int i = 0 ; i < numThread ; i++){ pthread_t thread; pthread_create(&thread , NULL , handleRequest , &Q); } while(1){ int conn = sock.Accept(); int res = Q.Push(conn); if(res != 0){ cerr << "Queue is full" << endl; close(conn); } } }
Queue.h
#include <pthread.h> template<typename T> class Queue{ public: Queue(int capacity) :capacity_(capacity) , size_(0), head_(0) , tail_(0){ pthread_mutex_init(&mutex_ , NULL); pthread_cond_init(¬_empty, NULL); buffer_ = new T[capacity]; } ~Queue(){ delete[] buffer_; pthread_mutex_destroy(&mutex_); pthread_cond_destroy(¬_empty); } int Push(T element); T Pop(); private: pthread_mutex_t mutex_; pthread_cond_t not_empty; int capacity_; int size_; int head_; int tail_; T * buffer_; }; template<typename T> int Queue<T>::Push(T element){ pthread_mutex_lock(&mutex_); if(size_ >= capacity_){ pthread_mutex_unlock(&mutex_); return -1; } buffer_[tail_++] = element; if(tail_ == capacity_){ tail_ = 0; } size_++; pthread_mutex_unlock(&mutex_); pthread_cond_signal(¬_empty); return 0; } template<typename T> T Queue<T>::Pop(){ T res; pthread_mutex_lock(&mutex_); while(!size_){ pthread_cond_wait(¬_empty , &mutex_); } res = buffer_[head_++]; if(head_ == capacity_){ head_ = 0; } size_--; pthread_mutex_unlock(&mutex_); return res; }
ServerSocket.h
class ServerSocket{ public: ServerSocket(int port , int backlog) :port_(port), backlog_(backlog), listenfd_(-1){} ~ServerSocket(){ Close(); } int Listen(); int Accept(); int Close(); private: int port_; int backlog_; int listenfd_; };
ServerSocket.cpp
#include <iostream> #include <sstream> #include <string> #include <cstdio> #include <cstdlib> #include <cstring> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include "ServerSocket.h" using namespace std; int ServerSocket::Listen(){ struct addrinfo hints; struct addrinfo *res , *rp; memset(&hints , 0 , sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; { stringstream ss; ss << port_; getaddrinfo(NULL , ss.str().c_str(), &hints , &res); } int sockfd = -1; for(rp = res ; rp != NULL ; rp = rp->ai_next){ sockfd = socket(rp->ai_family , rp->ai_socktype , rp->ai_protocol); if(sockfd == -1) continue; if(bind(sockfd , rp->ai_addr , rp->ai_addrlen) == 0) break; close(sockfd); } if(rp == NULL){ cerr << "ServerSocket::Listen() Bind Fail" << endl; return -1; } freeaddrinfo(res); int lres = listen(sockfd , backlog_); if(lres == -1){ perror("ServerSocket::Listen()"); return -1; } listenfd_ = sockfd; return 0; } int ServerSocket::Accept(){ int connfd = accept(listenfd_, NULL , NULL); if(connfd < 0){ perror("ServerSocket::Accept()"); return -1; } return connfd; } int ServerSocket::Close(){ if(listenfd_ >= 0){ int res = close(listenfd_); if(res == -1){ perror("ServerSocket::Close()"); return -1; } } return 0; }