▼ 2011/11/24(木) あり?なし?
細かいところはおいておいて、
とりあえず、こういう非同期I/Oな実装ってあり?なし?
とりあえず、こういう非同期I/Oな実装ってあり?なし?
#include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/queue.h> #include <sys/param.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <event.h> #include <signal.h> #include <netdb.h> #include <netinet/in.h> #include <pthread.h> #include <unistd.h> #include <errno.h> #include <time.h> #include <semaphore.h> #include <stdint.h> #define THREAD_POOL_NUM 4 #define NOTIFY_TYPE_STOP 1 #define NOTIFY_TYPE_TASK_DONE 2 #define THREAD_BUFFER_MANAGER_GLOW_ON 1 #define THREAD_BUFFER_MANAGER_GLOW_OFF 2 #define RESPONSE_BODY "<html>\n" \ " <head>\n" \ " </head>\n" \ " <body>\n" \ " hello world !!\n" \ " </body>\n" \ "</html>" #define RESPONSE_HEADER "HTTP/1.1 200 OK\r\n" \ "Content-Type: text/html\r\n" \ "Content-Length: %d\r\n\r\n" enum task_type { TASK_TYPE_ACCEPT = 1, TASK_TYPE_READ_DATA, TASK_TYPE_WRITE_DATA, }; enum queue_mode { THREAD_QUEUE_MODE_NOWAIT = 1, THREAD_QUEUE_MODE_WAIT, }; struct alloc_ptr { LIST_ENTRY(alloc_ptr) next; /* (sizeof(buffer_t) + buffer_size) * unit_count */ }; typedef struct alloc_ptr alloc_ptr_t; struct buffer { LIST_ENTRY(buffer) next; /* buffer_size */ }; typedef struct buffer buffer_t; struct buffer_manager { int glow; LIST_HEAD(alloc_ptr_head, alloc_ptr) alloc_ptr_head; LIST_HEAD(free_buffer_head, buffer) free_buffer_head; LIST_HEAD(used_buffer_head, buffer) used_buffer_head; size_t buffer_size; int unit_count; pthread_mutex_t lock; }; struct queue_data { void *data; void (*free_cb)(void *free_args, void *data); void *free_args; TAILQ_ENTRY(queue_data) next; }; struct queue { enum queue_mode mode; pthread_mutex_t lock; struct buffer_manager *buffer_manager; TAILQ_HEAD(queue_data_head, queue_data) queue_data_head; int current_queue_size; int max_queue_size; sem_t enqueue_sem; sem_t dequeue_sem; int dequeue_cancel; int queue_full_count; }; struct client_task { enum task_type type; int listen_sd; int sd; struct event read; char buffer[65535]; union { struct sockaddr_in in; struct sockaddr_in6 in6; } remote; socklen_t socklen; struct buffer_manager *task_buffer; struct queue *task_req_queue; }; struct io_thread { struct event_base *event_base; pthread_t io_thread; int listen_sd; int notify_read; struct event listen; struct event notify; struct buffer_manager *task_buffer; struct queue *task_req_queue; struct queue *task_res_queue; }; struct pool_thread { int run; pthread_t pool_thread; int notify_write; struct queue *task_req_queue; struct queue *task_res_queue; }; struct aio { struct event_base *event_base; struct event sigterm; struct event sigint; struct io_thread io_thread; struct pool_thread pool_thread[THREAD_POOL_NUM]; int notify_write; }; static int allocate_buffer( struct buffer_manager *buffer_manager, alloc_ptr_t **alloc_ptr) { alloc_ptr_t *new = NULL; buffer_t *tmp_buffer; char *tmp_ptr; int i; printf("allocate buffer\n"); if (alloc_ptr) { *alloc_ptr = NULL; } new = malloc(sizeof(alloc_ptr_t) + ((sizeof(buffer_t) + buffer_manager->buffer_size) * buffer_manager->unit_count)); if (new == NULL) { return 1; } LIST_INSERT_HEAD(&buffer_manager->alloc_ptr_head, new, next); tmp_ptr = (char *)new + sizeof(alloc_ptr_t); for (i = 0; i < buffer_manager->unit_count; i++) { tmp_buffer = (buffer_t *)tmp_ptr; LIST_INSERT_HEAD(&buffer_manager->free_buffer_head, tmp_buffer, next); tmp_ptr += buffer_manager->buffer_size + sizeof(buffer_t); } if (alloc_ptr) { *alloc_ptr = new; } return 0; } static int buffer_manager_create( struct buffer_manager **buffer_manager, int glow, size_t buffer_size, int unit_count) { alloc_ptr_t *new_alloc_ptr = NULL; struct buffer_manager *new = NULL; *buffer_manager = NULL; buffer_size = (buffer_size - (buffer_size & 3)) + 4; new = malloc(sizeof(struct buffer_manager)); if (new == NULL) { goto fail; } memset(new, 0, sizeof(struct buffer_manager)); LIST_INIT(&new->alloc_ptr_head); LIST_INIT(&new->free_buffer_head); new->buffer_size = buffer_size; new->unit_count = unit_count; if (allocate_buffer(new, &new_alloc_ptr)) { goto fail; } if (pthread_mutex_init(&new->lock, NULL)) { goto fail; } LIST_INIT(&new->used_buffer_head); new->glow = glow; *buffer_manager = new; return 0; fail: free(new_alloc_ptr); free(new); return 1; } static int buffer_manager_destroy( struct buffer_manager *buffer_manager) { alloc_ptr_t *alloc_ptr, *alloc_ptr_next; alloc_ptr = LIST_FIRST(&buffer_manager->alloc_ptr_head); while (alloc_ptr != NULL) { alloc_ptr_next = LIST_NEXT(alloc_ptr, next); LIST_REMOVE(alloc_ptr, next); free(alloc_ptr); alloc_ptr = alloc_ptr_next; } pthread_mutex_destroy(&buffer_manager->lock); free(buffer_manager); return 0; } static int buffer_manager_get( struct buffer_manager *buffer_manager, void **data) { int error = 0; buffer_t *free_buffer; *data = NULL; if (pthread_mutex_lock(&buffer_manager->lock)) { abort(); } free_buffer = LIST_FIRST(&buffer_manager->free_buffer_head); if (free_buffer == NULL) { if (buffer_manager->glow == THREAD_BUFFER_MANAGER_GLOW_ON) { if (allocate_buffer(buffer_manager, NULL)) { errno = ENOBUFS; error = 1; goto last; } free_buffer = LIST_FIRST(&buffer_manager->free_buffer_head); } else { errno = ENOBUFS; error = 1; goto last; } } LIST_REMOVE(free_buffer, next); LIST_INSERT_HEAD(&buffer_manager->used_buffer_head, free_buffer, next); *data = (char *)free_buffer + sizeof(buffer_t); last: if (pthread_mutex_unlock(&buffer_manager->lock)) { abort(); } return error; } static int buffer_manager_put( struct buffer_manager *buffer_manager, void *data) { buffer_t *used_buffer; used_buffer = (buffer_t *)((char *)data - sizeof(buffer_t)); if (pthread_mutex_lock(&buffer_manager->lock)) { abort(); } LIST_REMOVE(used_buffer, next); LIST_INSERT_HEAD(&buffer_manager->free_buffer_head, used_buffer, next); if (pthread_mutex_unlock(&buffer_manager->lock)) { abort(); } return 0; } static int queue_create( struct queue **queue, enum queue_mode mode, int max_queue_size) { struct queue *new = NULL; struct buffer_manager *new_buffer_manager = NULL; *queue = NULL; new = malloc(sizeof(struct queue)); if (new == NULL) { goto fail; } memset(new, 0, sizeof(struct queue)); if (buffer_manager_create( &new_buffer_manager, THREAD_BUFFER_MANAGER_GLOW_OFF, sizeof(struct queue_data), max_queue_size)) { goto fail; } if (sem_init(&new->dequeue_sem, 0, 0) != 0) { goto fail; } if (sem_init(&new->enqueue_sem, 0, 0) != 0) { goto fail; } if (pthread_mutex_init(&new->lock, NULL) != 0) { goto fail; } TAILQ_INIT(&new->queue_data_head); new->buffer_manager = new_buffer_manager; new->mode = mode; new->max_queue_size = max_queue_size; *queue = new; return 0; fail: if (new_buffer_manager) { buffer_manager_destroy(new_buffer_manager); } free(new); return 1; } static int queue_destroy(struct queue *queue) { struct queue_data *queue_data, *queue_data_next; queue_data = TAILQ_FIRST(&queue->queue_data_head); while (queue_data) { queue_data_next = TAILQ_NEXT(queue_data, next); TAILQ_REMOVE(&queue->queue_data_head, queue_data, next); queue_data->free_cb(queue_data->free_args, queue_data->data); buffer_manager_put( queue->buffer_manager, (void *)queue_data); queue_data = queue_data_next; } buffer_manager_destroy(queue->buffer_manager); pthread_mutex_destroy(&queue->lock); free(queue); return 0; } static int queue_enqueue( struct queue *queue, void *data, void (*free_cb)(void *free_args, void *data), void *free_args) { struct queue_data *queue_data; if (pthread_mutex_lock(&queue->lock) != 0) { abort(); } while (1) { if (buffer_manager_get(queue->buffer_manager, (void *)&queue_data)) { if (errno == ENOBUFS) { queue->queue_full_count++; if (queue->mode & THREAD_QUEUE_MODE_NOWAIT) { errno = ENOBUFS; if (pthread_mutex_unlock(&queue->lock) != 0) { abort(); } return 1; } if (pthread_mutex_unlock(&queue->lock) != 0) { abort(); } if (sem_wait(&queue->enqueue_sem)) { abort(); } if (pthread_mutex_lock(&queue->lock) != 0) { abort(); } continue; } else { abort(); } } break; } queue_data->data = data; queue_data->free_cb = free_cb; queue_data->free_args = free_args; TAILQ_INSERT_TAIL(&queue->queue_data_head, queue_data, next); queue->current_queue_size++; if (pthread_mutex_unlock(&queue->lock) != 0) { abort(); } /* dequeue_dataのセマフォにpost */ if (sem_post(&queue->dequeue_sem)) { abort(); } return 0; } static int queue_dequeue( struct queue *queue, void **data) { struct queue_data *queue_data; if (queue->mode & THREAD_QUEUE_MODE_WAIT) { if (sem_wait(&queue->dequeue_sem)) { abort(); } } if (pthread_mutex_lock(&queue->lock) != 0) { abort(); } queue_data = TAILQ_FIRST(&queue->queue_data_head); if (queue_data == NULL) { if (queue->mode & THREAD_QUEUE_MODE_WAIT) { if (queue->dequeue_cancel) { queue->dequeue_cancel -= 1; } else { abort(); } } *data = NULL; } else { TAILQ_REMOVE(&queue->queue_data_head, queue_data, next); queue->current_queue_size--; *data = queue_data->data; if (buffer_manager_put( queue->buffer_manager, queue_data)) { abort(); } if (queue->mode & THREAD_QUEUE_MODE_WAIT || queue->current_queue_size == queue->max_queue_size -1) { if (sem_post(&queue->enqueue_sem)) { abort(); } } } if (pthread_mutex_unlock(&queue->lock) != 0) { abort(); } return 0; } static int queue_dequeue_cancel( struct queue *queue) { if (pthread_mutex_lock(&queue->lock) != 0) { abort(); } queue->dequeue_cancel += 1; if (pthread_mutex_unlock(&queue->lock) != 0) { abort(); } sem_post(&queue->dequeue_sem); return 0; } static void get_task(struct buffer_manager *task_buffer, struct client_task **task) { struct client_task *t; buffer_manager_get(task_buffer, (void *)&t); *task = t; } static void relase_task(void *args, void *data) { struct buffer_manager *task_buffer; task_buffer = args; buffer_manager_put(task_buffer, data); } static void client_data(int sd, short ev, void *args) { struct client_task *task; task = args; if (ev == EV_READ) { task->type = TASK_TYPE_READ_DATA; } else if (ev == EV_WRITE) { task->type = TASK_TYPE_WRITE_DATA; } queue_enqueue(task->task_req_queue, task, relase_task, task->task_buffer); } static void io_thread_on_connect(int sd, short ev, void *args) { struct io_thread *io_thread; struct client_task *task; io_thread = args; //printf("on connect\n"); get_task(io_thread->task_buffer, &task); task->type = TASK_TYPE_ACCEPT; task->task_buffer = io_thread->task_buffer; task->task_req_queue = io_thread->task_req_queue; task->listen_sd = io_thread->listen_sd; queue_enqueue(task->task_req_queue, task, relase_task, task->task_buffer); } static void io_thread_notify(int fd, short ev, void *args) { struct io_thread *io_thread; struct client_task *task; char type; //printf("on notify\n"); io_thread = args; read(fd, &type, 1); switch (type) { case NOTIFY_TYPE_STOP: printf("stop notify\n"); event_del(&io_thread->listen); event_del(&io_thread->notify); break; case NOTIFY_TYPE_TASK_DONE: //printf("task done notify\n"); queue_dequeue(io_thread->task_res_queue, (void *)&task); switch (task->type) { case TASK_TYPE_ACCEPT: //printf("accept done\n"); event_set(&io_thread->listen, task->listen_sd, EV_READ, io_thread_on_connect, io_thread); event_base_set(io_thread->event_base, &io_thread->listen); event_priority_set(&io_thread->listen, 0); event_add(&io_thread->listen, NULL); event_set(&task->read, task->sd, EV_READ, client_data, task); event_base_set(io_thread->event_base, &task->read); event_add(&task->read, NULL); break; case TASK_TYPE_READ_DATA: //printf("read length done\n"); event_set(&task->read, task->sd, EV_WRITE, client_data, task); event_base_set(io_thread->event_base, &task->read); event_add(&task->read, NULL); break; default: //printf("%d\n", task->type); abort(); } break; default: abort(); } } static int io_thread_stop(int notify_write) { char type = NOTIFY_TYPE_STOP; printf("send stop notify\n"); write(notify_write, &type, 1); } static void * io_thread_start(void *args) { int s = -1; struct addrinfo hints, *res=NULL; struct io_thread *io_thread; int reuse_addr = 1; printf("start io thread\n"); io_thread = args; io_thread->event_base = event_init(); event_base_priority_init(io_thread->event_base, 1); event_set(&io_thread->notify, io_thread->notify_read, EV_READ | EV_PERSIST, io_thread_notify, io_thread); event_base_set(io_thread->event_base, &io_thread->notify); event_add(&io_thread->notify, NULL); memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV | AI_PASSIVE; if (getaddrinfo("0.0.0.0", "10080", &hints, &res) != 0) { goto on_fail; } if ((s = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) { goto on_fail; } if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr))) { goto on_fail; } if (bind(s, res->ai_addr, res->ai_addrlen) < 0) { goto on_fail; } if (listen(s, THREAD_POOL_NUM) < 0) { goto on_fail; } freeaddrinfo(res); res = NULL; io_thread->listen_sd = s; event_set(&io_thread->listen, io_thread->listen_sd, EV_READ, io_thread_on_connect, io_thread); event_base_set(io_thread->event_base, &io_thread->listen); event_priority_set(&io_thread->listen, 0); event_add(&io_thread->listen, NULL); event_base_dispatch(io_thread->event_base); close(s); event_base_free(io_thread->event_base); printf("io thread finish\n"); return NULL; on_fail: if (res != NULL) freeaddrinfo(res); if (s >= 0) close(s); printf("error in io thread\n"); return NULL; } static int pool_thread_task_done(int notify_write) { char type = NOTIFY_TYPE_TASK_DONE; write(notify_write, &type, 1); return 0; } static void pool_thread_stop(struct pool_thread *pool_thread) { pool_thread->run = 0; queue_dequeue_cancel(pool_thread->task_req_queue); } static void * pool_thread_start(void *args) { struct pool_thread *pool_thread; struct client_task *task; char type; int len; char response[8192]; printf("start pool thread\n"); pool_thread = args; while (pool_thread->run) { queue_dequeue(pool_thread->task_req_queue, (void *)&task); if (task == NULL) { break; } switch (task->type) { case TASK_TYPE_ACCEPT: //printf("do accept\n"); task->socklen = sizeof(task->remote); task->sd = accept(task->listen_sd, (struct sockaddr *)&task->remote, &task->socklen); queue_enqueue(pool_thread->task_res_queue, task, relase_task, task->task_buffer); pool_thread_task_done(pool_thread->notify_write); break; case TASK_TYPE_READ_DATA: if ((len = read(task->sd, task->buffer, sizeof(task->buffer) - 1)) <= 0) { //printf("failed in read\n"); close(task->sd); relase_task(task->task_buffer , task); break; } task->buffer[len] = '\0'; queue_enqueue(pool_thread->task_res_queue, task, relase_task, task->task_buffer); pool_thread_task_done(pool_thread->notify_write); break; case TASK_TYPE_WRITE_DATA: len = snprintf(response, sizeof(response), RESPONSE_HEADER RESPONSE_BODY, sizeof(RESPONSE_BODY)); //printf("res - %s\n", response); write(task->sd, response, len); close(task->sd); relase_task(task->task_buffer , task); break; default: abort(); } } printf("finish pool thread\n"); return NULL; } static void terminate(int fd, short event, void *args) { struct aio *aio; int i; printf("catch signal\n"); aio = args; for (i = 0; i < sizeof(aio->pool_thread)/sizeof(aio->pool_thread[0]); i++) { pool_thread_stop(&aio->pool_thread[i]); } io_thread_stop(aio->notify_write); signal_del(&aio->sigterm); signal_del(&aio->sigint); } int main(int argc, char *argv[]) { int i; struct aio aio; struct buffer_manager *task_buffer; struct queue *task_req_queue; struct queue *task_res_queue; int pipe_fd[2]; sigset_t sig_set; memset(&aio, 0, sizeof(aio)); buffer_manager_create( &task_buffer, THREAD_BUFFER_MANAGER_GLOW_ON, sizeof(struct client_task), 10000); queue_create( &task_req_queue, THREAD_QUEUE_MODE_WAIT, 10000); queue_create( &task_res_queue, THREAD_QUEUE_MODE_WAIT, 10000); pipe(pipe_fd); aio.event_base = event_init(); aio.notify_write = pipe_fd[1]; signal_set(&aio.sigterm, SIGTERM, terminate, &aio); event_base_set(aio.event_base, &aio.sigterm); signal_add(&aio.sigterm, NULL); signal_set(&aio.sigint, SIGINT, terminate, &aio); event_base_set(aio.event_base, &aio.sigint); signal_add(&aio.sigint, NULL); sigfillset(&sig_set); pthread_sigmask(SIG_BLOCK, &sig_set, NULL); aio.io_thread.task_buffer = task_buffer; aio.io_thread.task_req_queue = task_req_queue; aio.io_thread.task_res_queue = task_res_queue; aio.io_thread.notify_read = pipe_fd[0]; if (pthread_create(&aio.io_thread.io_thread, NULL, io_thread_start, &aio.io_thread)) { return 1; } for (i = 0; i < sizeof(aio.pool_thread)/sizeof(aio.pool_thread[0]); i++) { aio.pool_thread[i].task_req_queue = task_req_queue; aio.pool_thread[i].task_res_queue = task_res_queue; aio.pool_thread[i].notify_write = pipe_fd[1]; aio.pool_thread[i].run = 1; if (pthread_create(&aio.pool_thread[i].pool_thread, NULL, pool_thread_start, &aio.pool_thread[i])) { return 1; } } pthread_sigmask(SIG_UNBLOCK, &sig_set, NULL); event_base_dispatch(aio.event_base); pthread_join(aio.io_thread.io_thread, NULL); for (i = 0; i < sizeof(aio.pool_thread)/sizeof(aio.pool_thread[0]); i++) { pthread_join(aio.pool_thread[i].pool_thread, NULL); } close(pipe_fd[0]); close(pipe_fd[1]); queue_destroy(task_req_queue); queue_destroy(task_res_queue); buffer_manager_destroy(task_buffer); event_base_free(aio.event_base); return 0; }
1: yoshid 2011年11月28日(月) 午後2時21分
に日本語でおk
2: poti 2011年11月29日(火) 午前11時27分
どこをどう見ても日本語だよ!