ようこそゲストさん

無能日記

2011/11/24(木) あり?なし?

はてブ 2011/11/24 18:03 その他poti
細かいところはおいておいて、
とりあえず、こういう非同期I/Oな実装ってあり?なし?
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/queue.h>
#include <sys/param.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <event.h>
#include <signal.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <semaphore.h>
#include <stdint.h>

#define THREAD_POOL_NUM 4

#define NOTIFY_TYPE_STOP	1 
#define NOTIFY_TYPE_TASK_DONE	2

#define THREAD_BUFFER_MANAGER_GLOW_ON	1
#define THREAD_BUFFER_MANAGER_GLOW_OFF	2

#define RESPONSE_BODY "<html>\n" \
		      "  <head>\n" \
		      "  </head>\n" \
		      "  <body>\n" \
		      "  hello world !!\n" \
		      "  </body>\n" \
		      "</html>"

#define RESPONSE_HEADER "HTTP/1.1 200 OK\r\n" \
		      	"Content-Type: text/html\r\n" \
		        "Content-Length: %d\r\n\r\n"

enum task_type {
	TASK_TYPE_ACCEPT = 1,
	TASK_TYPE_READ_DATA,
	TASK_TYPE_WRITE_DATA,
};

enum queue_mode {
        THREAD_QUEUE_MODE_NOWAIT = 1,
        THREAD_QUEUE_MODE_WAIT,
};

struct alloc_ptr {
	LIST_ENTRY(alloc_ptr) next;
	/* (sizeof(buffer_t) + buffer_size) * unit_count */ 
};
typedef struct alloc_ptr alloc_ptr_t;

struct buffer {
	LIST_ENTRY(buffer) next;
	/* buffer_size */
};
typedef struct buffer buffer_t;

struct buffer_manager {
	int glow;				 	     
	LIST_HEAD(alloc_ptr_head, alloc_ptr) alloc_ptr_head;
	LIST_HEAD(free_buffer_head, buffer) free_buffer_head;
	LIST_HEAD(used_buffer_head, buffer) used_buffer_head;
	size_t buffer_size;				 
	int unit_count;					
	pthread_mutex_t lock;
};

struct queue_data {
        void *data;
	void (*free_cb)(void *free_args, void *data);
        void *free_args;
        TAILQ_ENTRY(queue_data) next;
};

struct queue {
        enum queue_mode mode;
        pthread_mutex_t lock;
        struct buffer_manager *buffer_manager;
        TAILQ_HEAD(queue_data_head, queue_data) queue_data_head;
        int current_queue_size;
        int max_queue_size;
        sem_t enqueue_sem;
        sem_t dequeue_sem;
        int dequeue_cancel;
        int queue_full_count;
};

struct client_task {
        enum task_type type;
	int listen_sd;
	int sd;
	struct event read;
	char buffer[65535];
	union {
		struct sockaddr_in in;
		struct sockaddr_in6 in6;
	} remote;
	socklen_t socklen;
	struct buffer_manager *task_buffer;
	struct queue *task_req_queue;
};

struct io_thread {
	struct event_base *event_base;
	pthread_t io_thread; 
        int listen_sd;
        int notify_read;
        struct event listen;
        struct event notify;
	struct buffer_manager *task_buffer;
	struct queue *task_req_queue;
	struct queue *task_res_queue;
};

struct pool_thread {
	int run;
	pthread_t pool_thread;
        int notify_write;
	struct queue *task_req_queue;
	struct queue *task_res_queue;
};

struct aio {
	struct event_base *event_base;
        struct event sigterm;
        struct event sigint;
        struct io_thread io_thread;
        struct pool_thread pool_thread[THREAD_POOL_NUM];
	int notify_write;
};


static int
allocate_buffer(
    struct buffer_manager *buffer_manager,
    alloc_ptr_t **alloc_ptr)
{
	alloc_ptr_t *new = NULL;
	buffer_t *tmp_buffer;
	char *tmp_ptr;
	int i;
	
	printf("allocate buffer\n");
	if (alloc_ptr) {
		*alloc_ptr = NULL;
	}
	new = malloc(sizeof(alloc_ptr_t) +
	    ((sizeof(buffer_t) +
	     buffer_manager->buffer_size) *
	     buffer_manager->unit_count));
	if (new == NULL) {
		return 1;
	}
	LIST_INSERT_HEAD(&buffer_manager->alloc_ptr_head, new, next);
	tmp_ptr = (char *)new + sizeof(alloc_ptr_t);
	for (i = 0; i < buffer_manager->unit_count; i++) {
		tmp_buffer = (buffer_t *)tmp_ptr;
		LIST_INSERT_HEAD(&buffer_manager->free_buffer_head, tmp_buffer, next);
		tmp_ptr += buffer_manager->buffer_size + sizeof(buffer_t);
	}
	if (alloc_ptr) {
		*alloc_ptr = new;
	}

	return 0;
}

static int
buffer_manager_create(
    struct buffer_manager **buffer_manager,
    int glow,
    size_t buffer_size,
    int unit_count)
{
	alloc_ptr_t *new_alloc_ptr = NULL;
	struct buffer_manager *new = NULL;

	*buffer_manager = NULL;
	buffer_size = (buffer_size - (buffer_size & 3)) + 4;
	new = malloc(sizeof(struct buffer_manager));
	if (new == NULL) {
		goto fail;
	}
	memset(new, 0, sizeof(struct buffer_manager));
	LIST_INIT(&new->alloc_ptr_head);
	LIST_INIT(&new->free_buffer_head);
	new->buffer_size = buffer_size;
	new->unit_count = unit_count;
	if (allocate_buffer(new, &new_alloc_ptr)) {
		goto fail;
	}
	if (pthread_mutex_init(&new->lock, NULL)) {
		goto fail;

	}
	LIST_INIT(&new->used_buffer_head);
	new->glow = glow;
	*buffer_manager = new;

	return 0;

fail:
	free(new_alloc_ptr);
	free(new);

	return 1;
}

static int
buffer_manager_destroy(
    struct buffer_manager *buffer_manager)
{
	alloc_ptr_t *alloc_ptr, *alloc_ptr_next;

	alloc_ptr = LIST_FIRST(&buffer_manager->alloc_ptr_head);
	while (alloc_ptr != NULL) {
		alloc_ptr_next = LIST_NEXT(alloc_ptr, next);
		LIST_REMOVE(alloc_ptr, next);
		free(alloc_ptr);
		alloc_ptr = alloc_ptr_next;
	}
	pthread_mutex_destroy(&buffer_manager->lock);
	free(buffer_manager);

	return 0;
}

static int
buffer_manager_get(
    struct buffer_manager *buffer_manager,
    void **data)
{
	int error = 0;
	buffer_t *free_buffer;

	*data = NULL;
	if (pthread_mutex_lock(&buffer_manager->lock)) {
		abort();
	}
	free_buffer = LIST_FIRST(&buffer_manager->free_buffer_head);
	if (free_buffer == NULL) {
		if (buffer_manager->glow == THREAD_BUFFER_MANAGER_GLOW_ON) {
			if (allocate_buffer(buffer_manager, NULL)) {
				errno = ENOBUFS;
				error = 1;
				goto last;
			}
			free_buffer = LIST_FIRST(&buffer_manager->free_buffer_head);
		} else {
			errno = ENOBUFS;
			error = 1;
			goto last;
		}
	}
	LIST_REMOVE(free_buffer, next);
	LIST_INSERT_HEAD(&buffer_manager->used_buffer_head, free_buffer, next);
	*data = (char *)free_buffer +  sizeof(buffer_t);
last:
	if (pthread_mutex_unlock(&buffer_manager->lock)) {
		abort();
	}
	
	return error;
}

static int
buffer_manager_put(
    struct buffer_manager *buffer_manager,
    void *data)
{
	buffer_t *used_buffer;

	used_buffer = (buffer_t *)((char *)data - sizeof(buffer_t));
	if (pthread_mutex_lock(&buffer_manager->lock)) {
		abort();
	}
	LIST_REMOVE(used_buffer, next);
	LIST_INSERT_HEAD(&buffer_manager->free_buffer_head, used_buffer, next);
	if (pthread_mutex_unlock(&buffer_manager->lock)) {
		abort();
	}
	
	return 0;
}

static int
queue_create(
    struct queue **queue,
    enum queue_mode mode,
    int max_queue_size) {
	struct queue *new = NULL;
	struct buffer_manager *new_buffer_manager = NULL;

	*queue = NULL;
	new = malloc(sizeof(struct queue));
	if (new == NULL) {
		goto fail;
	}
	memset(new, 0, sizeof(struct queue));
	if (buffer_manager_create(
	    &new_buffer_manager,
	    THREAD_BUFFER_MANAGER_GLOW_OFF,
	    sizeof(struct queue_data),
	    max_queue_size)) {
		goto fail;
	}
	if (sem_init(&new->dequeue_sem, 0, 0) != 0) {
		goto fail;
	}
	if (sem_init(&new->enqueue_sem, 0, 0) != 0) {
		goto fail;
	}
	if (pthread_mutex_init(&new->lock, NULL) != 0) {
		goto fail;
	}
	TAILQ_INIT(&new->queue_data_head);
	new->buffer_manager = new_buffer_manager;
	new->mode = mode;
	new->max_queue_size = max_queue_size;
	*queue = new;

	return 0;

fail:
	if (new_buffer_manager) {
		buffer_manager_destroy(new_buffer_manager);
	}
	free(new);

	return 1;
}

static int
queue_destroy(struct queue *queue) {
	struct queue_data *queue_data, *queue_data_next;

	queue_data = TAILQ_FIRST(&queue->queue_data_head);
	while (queue_data) {
		queue_data_next = TAILQ_NEXT(queue_data, next);
		TAILQ_REMOVE(&queue->queue_data_head, queue_data, next);
		queue_data->free_cb(queue_data->free_args, queue_data->data);
		buffer_manager_put(
		    queue->buffer_manager,
		    (void *)queue_data);
		queue_data = queue_data_next;
	}
	buffer_manager_destroy(queue->buffer_manager);
	pthread_mutex_destroy(&queue->lock);
	free(queue);

	return 0;
}

static int
queue_enqueue(
    struct queue *queue,
    void *data,
    void (*free_cb)(void *free_args, void *data),
    void *free_args) {
	struct queue_data *queue_data;

        if (pthread_mutex_lock(&queue->lock) != 0) {
		abort();
        }
	while (1) {
		if (buffer_manager_get(queue->buffer_manager, (void *)&queue_data)) {
			if (errno == ENOBUFS) {
				queue->queue_full_count++;
				if (queue->mode & THREAD_QUEUE_MODE_NOWAIT) {
					errno = ENOBUFS;
					if (pthread_mutex_unlock(&queue->lock) != 0) {
						abort();
					}
					return 1;
				}
				if (pthread_mutex_unlock(&queue->lock) != 0) {
					abort();
				}
				if (sem_wait(&queue->enqueue_sem)) {
					abort();
				}
				if (pthread_mutex_lock(&queue->lock) != 0) {
					abort();
				}
				continue;
			} else {
				abort();
			}
		} 
		break;
	}
	queue_data->data = data;
	queue_data->free_cb = free_cb;
	queue_data->free_args = free_args;
	TAILQ_INSERT_TAIL(&queue->queue_data_head, queue_data, next);
	queue->current_queue_size++;
        if (pthread_mutex_unlock(&queue->lock) != 0) {
		abort();
        }
	/* dequeue_dataのセマフォにpost */
	if (sem_post(&queue->dequeue_sem)) {
		abort();
	}

	return 0;
}

static int
queue_dequeue(
    struct queue *queue,
    void **data) {
	struct queue_data *queue_data;

	if (queue->mode & THREAD_QUEUE_MODE_WAIT) {
		if (sem_wait(&queue->dequeue_sem)) {
			abort();
		}
	}
        if (pthread_mutex_lock(&queue->lock) != 0) {
		abort();
        }
	queue_data = TAILQ_FIRST(&queue->queue_data_head);
	if (queue_data == NULL) {
		if (queue->mode & THREAD_QUEUE_MODE_WAIT) {
			if (queue->dequeue_cancel) {
				queue->dequeue_cancel -= 1;
			} else {
				abort();
			}
		}
		*data = NULL;
	} else {
		TAILQ_REMOVE(&queue->queue_data_head, queue_data, next);
		queue->current_queue_size--;
		*data = queue_data->data;
		if (buffer_manager_put(
		    queue->buffer_manager,
		    queue_data)) {
			abort();
		}
		if (queue->mode & THREAD_QUEUE_MODE_WAIT ||
		    queue->current_queue_size == queue->max_queue_size -1) {
			if (sem_post(&queue->enqueue_sem)) {
				abort();
			}
		}
	}
        if (pthread_mutex_unlock(&queue->lock) != 0) {
		abort();
        }

	return 0;
}

static int
queue_dequeue_cancel(
    struct queue *queue)
{
        if (pthread_mutex_lock(&queue->lock) != 0) {
		abort();
        }
	queue->dequeue_cancel += 1;
        if (pthread_mutex_unlock(&queue->lock) != 0) {
		abort();
        }
	sem_post(&queue->dequeue_sem);

	return 0;
}

static void
get_task(struct buffer_manager *task_buffer, struct client_task **task) {
	struct client_task *t;

	buffer_manager_get(task_buffer, (void *)&t);
	*task = t;
}

static void
relase_task(void *args, void *data) {
	struct buffer_manager *task_buffer;

	task_buffer = args;
	buffer_manager_put(task_buffer, data);
}

static void
client_data(int sd, short ev, void *args) {
	struct client_task *task;

        task = args;
	if (ev == EV_READ) {
		task->type = TASK_TYPE_READ_DATA;
	} else if (ev == EV_WRITE) {
		task->type = TASK_TYPE_WRITE_DATA;
	}
	queue_enqueue(task->task_req_queue, task, relase_task, task->task_buffer);
}

static void
io_thread_on_connect(int sd, short ev, void *args) {
        struct io_thread *io_thread;
	struct client_task *task;

        io_thread = args;
	//printf("on connect\n");
	get_task(io_thread->task_buffer, &task);
	task->type = TASK_TYPE_ACCEPT;
	task->task_buffer = io_thread->task_buffer;
	task->task_req_queue = io_thread->task_req_queue;
	task->listen_sd = io_thread->listen_sd;
	queue_enqueue(task->task_req_queue, task, relase_task, task->task_buffer);
}

static void
io_thread_notify(int fd, short ev, void *args) {
        struct io_thread *io_thread;
	struct client_task *task;
	char type;

	//printf("on notify\n");
        io_thread = args;
	read(fd, &type, 1);
	switch (type) {
	case NOTIFY_TYPE_STOP:
		printf("stop notify\n");
		event_del(&io_thread->listen);
		event_del(&io_thread->notify);
		break;
	case NOTIFY_TYPE_TASK_DONE:
		//printf("task done notify\n");
		queue_dequeue(io_thread->task_res_queue, (void *)&task);
		switch (task->type) {
		case TASK_TYPE_ACCEPT:
			//printf("accept done\n");
        		event_set(&io_thread->listen, task->listen_sd, EV_READ, io_thread_on_connect, io_thread);
			event_base_set(io_thread->event_base, &io_thread->listen);
			event_priority_set(&io_thread->listen, 0);
			event_add(&io_thread->listen, NULL);
        		event_set(&task->read, task->sd, EV_READ, client_data, task);
			event_base_set(io_thread->event_base, &task->read);
			event_add(&task->read, NULL);
			break;
		case TASK_TYPE_READ_DATA:
			//printf("read length done\n");
        		event_set(&task->read, task->sd, EV_WRITE, client_data, task);
			event_base_set(io_thread->event_base, &task->read);
			event_add(&task->read, NULL);
			break;
		default:
			//printf("%d\n", task->type);
			abort();
		}
		break;
	default:
		abort();
	}
}

static int
io_thread_stop(int notify_write) {
	char type = NOTIFY_TYPE_STOP;

	printf("send stop notify\n");
	write(notify_write, &type, 1);
}

static void *
io_thread_start(void *args) {
        int s = -1;
        struct addrinfo  hints, *res=NULL;
        struct io_thread *io_thread;
	int reuse_addr = 1;
        
	printf("start io thread\n");
        io_thread = args;
        io_thread->event_base = event_init();
        event_base_priority_init(io_thread->event_base, 1);

        event_set(&io_thread->notify, io_thread->notify_read, EV_READ | EV_PERSIST, io_thread_notify, io_thread);
	event_base_set(io_thread->event_base, &io_thread->notify);
        event_add(&io_thread->notify, NULL);

        memset(&hints, 0, sizeof(hints));
        hints.ai_family = PF_UNSPEC;
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV | AI_PASSIVE;
        if (getaddrinfo("0.0.0.0", "10080", &hints, &res) != 0) {
                goto on_fail;
        }
        if ((s = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) {
                goto on_fail;
        }
        if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr))) {
                goto on_fail;
        }
        if (bind(s, res->ai_addr, res->ai_addrlen) < 0) {
                goto on_fail;
        }
	if (listen(s, THREAD_POOL_NUM) < 0) {
                goto on_fail;
	}
        freeaddrinfo(res);
	res = NULL;
        io_thread->listen_sd = s;
        event_set(&io_thread->listen, io_thread->listen_sd, EV_READ, io_thread_on_connect, io_thread);
	event_base_set(io_thread->event_base, &io_thread->listen);
	event_priority_set(&io_thread->listen, 0);
        event_add(&io_thread->listen, NULL);

        event_base_dispatch(io_thread->event_base);

	close(s);
	event_base_free(io_thread->event_base);

        printf("io thread finish\n");

        return NULL;

on_fail:
        if (res != NULL)
                freeaddrinfo(res);
        if (s >= 0)
                close(s);

        printf("error in io thread\n");

	return NULL;
}

static int
pool_thread_task_done(int notify_write) {
	char type = NOTIFY_TYPE_TASK_DONE;

	write(notify_write, &type, 1);

	return 0;
}

static void
pool_thread_stop(struct pool_thread *pool_thread) {
	pool_thread->run = 0;
	queue_dequeue_cancel(pool_thread->task_req_queue);
}

static void *
pool_thread_start(void *args) {
	struct pool_thread *pool_thread;
	struct client_task *task;
	char type;
	int len;
	char response[8192];

	printf("start pool thread\n");
	pool_thread = args;
	while (pool_thread->run) {
		queue_dequeue(pool_thread->task_req_queue, (void *)&task);
		if (task == NULL) {
			break;
		}
		switch (task->type) {
		case TASK_TYPE_ACCEPT:
			//printf("do accept\n");
			task->socklen = sizeof(task->remote);
			task->sd = accept(task->listen_sd, (struct sockaddr *)&task->remote, &task->socklen); 
			queue_enqueue(pool_thread->task_res_queue, task, relase_task, task->task_buffer);
			pool_thread_task_done(pool_thread->notify_write);
			break;
		case TASK_TYPE_READ_DATA:
			if ((len = read(task->sd, task->buffer, sizeof(task->buffer) - 1)) <= 0) {
				//printf("failed in read\n");
				close(task->sd);
				relase_task(task->task_buffer , task);
				break;
			}
			task->buffer[len] = '\0';
			queue_enqueue(pool_thread->task_res_queue, task, relase_task, task->task_buffer);
			pool_thread_task_done(pool_thread->notify_write);
			break;
		case TASK_TYPE_WRITE_DATA:
			len = snprintf(response, sizeof(response), RESPONSE_HEADER RESPONSE_BODY, sizeof(RESPONSE_BODY));
			//printf("res - %s\n", response);
			write(task->sd, response, len);
			close(task->sd);
			relase_task(task->task_buffer , task);
			break;
		default:
			abort();
		}
	}
	printf("finish pool thread\n");

	return NULL;
}

static void
terminate(int fd, short event, void *args)
{
        struct aio *aio;
	int i;

	printf("catch signal\n");
        aio = args;
	for (i = 0; i < sizeof(aio->pool_thread)/sizeof(aio->pool_thread[0]); i++) {
		pool_thread_stop(&aio->pool_thread[i]);
	}
        io_thread_stop(aio->notify_write);
        signal_del(&aio->sigterm);
        signal_del(&aio->sigint);
}

int
main(int argc, char *argv[])
{
	int i;
	struct aio aio;
	struct buffer_manager *task_buffer;
	struct queue *task_req_queue;
	struct queue *task_res_queue;
	int pipe_fd[2];
	sigset_t sig_set;

	memset(&aio, 0, sizeof(aio));
        buffer_manager_create(
	    &task_buffer,
	    THREAD_BUFFER_MANAGER_GLOW_ON,
	    sizeof(struct client_task),
	    10000);
	queue_create(
	    &task_req_queue, THREAD_QUEUE_MODE_WAIT, 10000);
	queue_create(
	    &task_res_queue, THREAD_QUEUE_MODE_WAIT, 10000);
        pipe(pipe_fd);
	aio.event_base = event_init();
	aio.notify_write = pipe_fd[1];

	signal_set(&aio.sigterm, SIGTERM, terminate, &aio);
	event_base_set(aio.event_base, &aio.sigterm);
	signal_add(&aio.sigterm, NULL);
	signal_set(&aio.sigint, SIGINT, terminate, &aio);
	event_base_set(aio.event_base, &aio.sigint);
	signal_add(&aio.sigint, NULL);

	sigfillset(&sig_set);
	pthread_sigmask(SIG_BLOCK, &sig_set, NULL);

	aio.io_thread.task_buffer = task_buffer; 
	aio.io_thread.task_req_queue = task_req_queue; 
	aio.io_thread.task_res_queue = task_res_queue; 
	aio.io_thread.notify_read = pipe_fd[0]; 
	if (pthread_create(&aio.io_thread.io_thread, NULL, io_thread_start, &aio.io_thread)) {
		return 1;
	}
	for (i = 0; i < sizeof(aio.pool_thread)/sizeof(aio.pool_thread[0]); i++) {
		aio.pool_thread[i].task_req_queue = task_req_queue;
		aio.pool_thread[i].task_res_queue = task_res_queue;
		aio.pool_thread[i].notify_write = pipe_fd[1];
		aio.pool_thread[i].run = 1;
		if (pthread_create(&aio.pool_thread[i].pool_thread, NULL, pool_thread_start, &aio.pool_thread[i])) {
			return 1;
		}
	}

	pthread_sigmask(SIG_UNBLOCK, &sig_set, NULL);
	
        event_base_dispatch(aio.event_base);

	pthread_join(aio.io_thread.io_thread, NULL);
	for (i = 0; i < sizeof(aio.pool_thread)/sizeof(aio.pool_thread[0]); i++) {
		pthread_join(aio.pool_thread[i].pool_thread, NULL);
	}

	close(pipe_fd[0]);
	close(pipe_fd[1]);
	queue_destroy(task_req_queue);
	queue_destroy(task_res_queue);
	buffer_manager_destroy(task_buffer);
	event_base_free(aio.event_base);

	return 0;
}

1: yoshid 2011年11月28日(月) 午後2時21分

に日本語でおk

2: poti 2011年11月29日(火) 午前11時27分

どこをどう見ても日本語だよ!