マルチスレッドでのechoサーバ

マルチスレッドでのechoサーバのメモ

server.cc

#include <iostream>
#include <sstream>
#include <pthread.h>

#include "ServerSocket.h"
#include "Queue.h"
using namespace std;

string readLine(int sock){
  char c;
  stringstream buffer;
  while(1){
    int ret = read(sock, &c , 1);
    if(ret == -1){
      perror("read");
      break;
    }else if(ret == 0){
      break;
    }
    buffer << c;
    if(c == '\n'){
      break;
    }
  }
  return buffer.str();
}

void * handleRequest(void * arg){
  Queue<int> * Q = static_cast< Queue<int> *> (arg);
  while(1){
    int conn = Q->Pop();
    while(1){
      string line = readLine(conn);
      if(line.size() == 0)break;
      //注:1度のwriteで必ずしもソケットにすべて書けるわけではない
      write(conn , line.c_str() , line.size());
    }
    close(conn);
  }
  return NULL;
}

int main(){
  int numThread = 10;
  ServerSocket sock(8080 , 128);
  int r = sock.Listen();
  cout << r << endl;
  Queue<int> Q(numThread);
  for(int i = 0 ; i < numThread ; i++){
    pthread_t thread;
    pthread_create(&thread , NULL , handleRequest , &Q);
  }

  while(1){
    int conn = sock.Accept();
    int res = Q.Push(conn);
    if(res != 0){
      cerr << "Queue is full" << endl;
      close(conn);
    }
  }
}

Queue.h

#include <pthread.h>

template<typename T> class Queue{
  public:
    Queue(int capacity)
      :capacity_(capacity) , size_(0), head_(0) , tail_(0){
        pthread_mutex_init(&mutex_ , NULL);
        pthread_cond_init(&not_empty, NULL);
        buffer_ = new T[capacity];
      }
    ~Queue(){
      delete[] buffer_;
      pthread_mutex_destroy(&mutex_);
      pthread_cond_destroy(&not_empty);
    }
    int Push(T element);
    T Pop();
  private:
    pthread_mutex_t mutex_;
    pthread_cond_t not_empty;
    int capacity_;
    int size_;
    int head_;
    int tail_;
    T * buffer_;
};


template<typename T>
int Queue<T>::Push(T element){
  pthread_mutex_lock(&mutex_);
  if(size_ >= capacity_){
    pthread_mutex_unlock(&mutex_);
    return -1;
  }
  buffer_[tail_++] = element;
  if(tail_ == capacity_){
    tail_ = 0;
  }
  size_++;
  pthread_mutex_unlock(&mutex_);
  pthread_cond_signal(&not_empty);
  return 0;
}

template<typename T>
T Queue<T>::Pop(){
  T res;
  pthread_mutex_lock(&mutex_);
  while(!size_){
    pthread_cond_wait(&not_empty , &mutex_);
  }
  res = buffer_[head_++];
  if(head_ == capacity_){
    head_ = 0;
  }
  size_--;
  pthread_mutex_unlock(&mutex_);
  return res;
}

ServerSocket.h

class ServerSocket{
  public:
    ServerSocket(int port , int backlog)
      :port_(port), backlog_(backlog), listenfd_(-1){}
    ~ServerSocket(){
      Close();
    }
    int Listen();
    int Accept();
    int Close();
  private:
    int port_;
    int backlog_;
    int listenfd_;
};

ServerSocket.cpp

#include <iostream>
#include <sstream>
#include <string>
#include <cstdio>
#include <cstdlib>
#include <cstring>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>

#include "ServerSocket.h"

using namespace std;

int ServerSocket::Listen(){
  struct addrinfo hints;
  struct addrinfo *res , *rp;
  memset(&hints , 0 , sizeof(hints));
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  {
    stringstream ss;
    ss << port_;
    getaddrinfo(NULL , ss.str().c_str(), &hints , &res);
  }
  int sockfd = -1;
  for(rp = res ; rp != NULL ; rp = rp->ai_next){
    sockfd = socket(rp->ai_family , rp->ai_socktype , rp->ai_protocol);
    if(sockfd == -1)
      continue;
    if(bind(sockfd , rp->ai_addr , rp->ai_addrlen) == 0)
      break;
    close(sockfd);
  }
  if(rp == NULL){
    cerr << "ServerSocket::Listen() Bind Fail" << endl;
    return -1;
  }
  freeaddrinfo(res);
  int lres = listen(sockfd , backlog_);
  if(lres == -1){
    perror("ServerSocket::Listen()");
    return -1;
  }
  listenfd_ = sockfd;
  return 0;
}

int ServerSocket::Accept(){
  int connfd = accept(listenfd_, NULL , NULL);
  if(connfd < 0){
    perror("ServerSocket::Accept()");
    return -1;
  }
  return connfd;
}

int ServerSocket::Close(){
  if(listenfd_ >= 0){
    int res = close(listenfd_);
    if(res == -1){
      perror("ServerSocket::Close()");
      return -1;
    }
  }
  return 0;
}