ようこそゲストさん

無能日記

メッセージ欄

2011年12月の日記

一覧で表示する

2011/12/17(土) websocketの処理の仕方を考えてみた。

はてブ 2011/12/17 23:01 R&D (Websocket)poti
websocketを処理するプログラムを書くときどう処理すれば割と奇麗になりそうか、考えてみた。結果、こんなイメージ。(図の要素の名前は結構適当)

ソケットを管理する部分と各機能やモジュールを管理する部分に分けてそれぞれわけてみた。

ソケットを管理する部分では、コネクション張ったり、データの変換したり1つまたは複数のソケットにデータを送る(図のforwardingって適切な表現じゃないな)部分を処理する。

モジュールを管理する部分では、各モジュールからのデータをカプセル化したり、きたフレームをディスパッチして適切なモジュールに渡す部分を処理する。

そして、各モジュールからはモジュール管理部分のAPIを叩くことで、ソケット処理に関する部分を隠蔽してやるようにすれば、割と奇麗にまとまるんじゃないかなと思っている。

すごくずれてるということはないはず。多分。

まぁ、websocketなんてただの土管でしかないので、pppとかとおなじノリでやればいいんだよね?きっと。てことはwebsocket vpnとかやらなきゃ。

ちなみに、socket.ioは結局のところソケット管理部分とモジュール管理部分をまとめて実装してる感じだから。要するにnode.js使う限りはsocket.io使っとけってことか。

1: momijiame 『もっとシンプルに概念図だけを!』 (2011/12/18 17:22)

2011/12/15(木) libevent2

はてブ 2011/12/15 21:00 その他poti
libevent2で書いてみた。
libevent2いいよlibevent2

/* need libevent 2.XX */
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/queue.h>
#include <sys/param.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/thread.h>
#include <signal.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <semaphore.h>
#include <stdint.h>

#define THREAD_POOL_NUM 4

#define NOTIFY_TYPE_STOP        1
#define NOTIFY_TYPE_TASK_DONE   2

#define THREAD_BUFFER_MANAGER_GLOW_ON   1
#define THREAD_BUFFER_MANAGER_GLOW_OFF  2

#define RESPONSE_BODY "<html>\n" \
                      "  <head>\n" \
                      "  </head>\n" \
                      "  <body>\n" \
                      "  hello world !!\n" \
                      "  </body>\n" \
                      "</html>"

#define RESPONSE_HEADER "HTTP/1.1 200 OK\r\n" \
                        "Content-Type: text/html\r\n" \
                        "Content-Length: %d\r\n\r\n"

enum task_type {
        TASK_TYPE_ACCEPT = 1,
        TASK_TYPE_READ_DATA,
        TASK_TYPE_WRITE_DATA,
};

enum queue_mode {
        THREAD_QUEUE_MODE_NOWAIT = 1,
        THREAD_QUEUE_MODE_WAIT,
};

struct alloc_ptr {
        LIST_ENTRY(alloc_ptr) next;
        /* (sizeof(buffer_t) + buffer_size) * unit_count */
};
typedef struct alloc_ptr alloc_ptr_t;

struct buffer {
        LIST_ENTRY(buffer) next;
        /* buffer_size */
};
typedef struct buffer buffer_t;

struct buffer_manager {
        int glow;
        LIST_HEAD(alloc_ptr_head, alloc_ptr) alloc_ptr_head;
        LIST_HEAD(free_buffer_head, buffer) free_buffer_head;
        LIST_HEAD(used_buffer_head, buffer) used_buffer_head;
        size_t buffer_size;
        int unit_count;
        pthread_mutex_t lock;
};

struct queue_data {
        void *data;
        void (*free_cb)(void *free_args, void *data);
        void *free_args;
        TAILQ_ENTRY(queue_data) next;
};

struct queue {
        enum queue_mode mode;
        pthread_mutex_t lock;
        struct buffer_manager *buffer_manager;
        TAILQ_HEAD(queue_data_head, queue_data) queue_data_head;
        int current_queue_size;
        int max_queue_size;
        sem_t enqueue_sem;
        sem_t dequeue_sem;
        int dequeue_cancel;
        int queue_full_count;
};

struct client_task {
        enum task_type type;
        int sd;
        struct event read_write;
        char buffer[65535];
        union {
                struct sockaddr_in in;
                struct sockaddr_in6 in6;
        } remote;
        socklen_t socklen;
        struct buffer_manager *task_buffer;
        struct queue *task_queue;
        void *io_thread;
};

struct io_thread {
        struct event_base *event_base;
        pthread_t io_thread;
        int listen_sd;
        struct event *listen;
        struct event *notify;
        struct buffer_manager *task_buffer;
        struct queue *task_queue;
        int notify_read;
};

struct pool_thread {
        int run;
        pthread_t pool_thread;
        struct queue *task_queue;
};

struct aio {
        struct event_base *event_base;
        struct event *sigterm;
        struct event *sigint;
        struct io_thread io_thread;
        struct pool_thread pool_thread[THREAD_POOL_NUM];
        int notify_write;
};


static int
allocate_buffer(
    struct buffer_manager *buffer_manager,
    alloc_ptr_t **alloc_ptr)
{
        alloc_ptr_t *new = NULL;
        buffer_t *tmp_buffer;
        char *tmp_ptr;
        int i;

        //printf("allocate buffer\n");
        if (alloc_ptr) {
                *alloc_ptr = NULL;
        }
        new = malloc(sizeof(alloc_ptr_t) +
            ((sizeof(buffer_t) +
             buffer_manager->buffer_size) *
             buffer_manager->unit_count));
        if (new == NULL) {
                return 1;
        }
        LIST_INSERT_HEAD(&buffer_manager->alloc_ptr_head, new, next);
        tmp_ptr = (char *)new + sizeof(alloc_ptr_t);
        for (i = 0; i < buffer_manager->unit_count; i++) {
                tmp_buffer = (buffer_t *)tmp_ptr;
                LIST_INSERT_HEAD(&buffer_manager->free_buffer_head, tmp_buffer, next);
                tmp_ptr += buffer_manager->buffer_size + sizeof(buffer_t);
        }
        if (alloc_ptr) {
                *alloc_ptr = new;
        }

        return 0;
}

static int
buffer_manager_create(
    struct buffer_manager **buffer_manager,
    int glow,
    size_t buffer_size,
    int unit_count)
{
        alloc_ptr_t *new_alloc_ptr = NULL;
        struct buffer_manager *new = NULL;

        *buffer_manager = NULL;
        buffer_size = (buffer_size - (buffer_size & 3)) + 4;
        new = malloc(sizeof(struct buffer_manager));
        if (new == NULL) {
                goto fail;
        }
        memset(new, 0, sizeof(struct buffer_manager));
        LIST_INIT(&new->alloc_ptr_head);
        LIST_INIT(&new->free_buffer_head);
        new->buffer_size = buffer_size;
        new->unit_count = unit_count;
        if (allocate_buffer(new, &new_alloc_ptr)) {
                goto fail;
        }
        if (pthread_mutex_init(&new->lock, NULL)) {
                goto fail;

        }
        LIST_INIT(&new->used_buffer_head);
        new->glow = glow;
        *buffer_manager = new;

        return 0;

fail:
        free(new_alloc_ptr);
        free(new);

        return 1;
}

static int
buffer_manager_destroy(
    struct buffer_manager *buffer_manager)
{
        alloc_ptr_t *alloc_ptr, *alloc_ptr_next;

        alloc_ptr = LIST_FIRST(&buffer_manager->alloc_ptr_head);
        while (alloc_ptr != NULL) {
                alloc_ptr_next = LIST_NEXT(alloc_ptr, next);
                LIST_REMOVE(alloc_ptr, next);
                free(alloc_ptr);
                alloc_ptr = alloc_ptr_next;
        }
        pthread_mutex_destroy(&buffer_manager->lock);
        free(buffer_manager);

        return 0;
}

static int
buffer_manager_get(
    struct buffer_manager *buffer_manager,
    void **data)
{
        int error = 0;
        buffer_t *free_buffer;

        *data = NULL;
        if (pthread_mutex_lock(&buffer_manager->lock)) {
                abort();
        }
        free_buffer = LIST_FIRST(&buffer_manager->free_buffer_head);
        if (free_buffer == NULL) {
                if (buffer_manager->glow == THREAD_BUFFER_MANAGER_GLOW_ON) {
                        if (allocate_buffer(buffer_manager, NULL)) {
                                errno = ENOBUFS;
                                error = 1;
                                goto last;
                        }
                        free_buffer = LIST_FIRST(&buffer_manager->free_buffer_head);
                } else {
                        errno = ENOBUFS;
                        error = 1;
                        goto last;
                }
        }
        LIST_REMOVE(free_buffer, next);
        LIST_INSERT_HEAD(&buffer_manager->used_buffer_head, free_buffer, next);
        *data = (char *)free_buffer +  sizeof(buffer_t);
last:
        if (pthread_mutex_unlock(&buffer_manager->lock)) {
                abort();
        }

        return error;
}

static int
buffer_manager_put(
    struct buffer_manager *buffer_manager,
    void *data)
{
        buffer_t *used_buffer;

        used_buffer = (buffer_t *)((char *)data - sizeof(buffer_t));
        if (pthread_mutex_lock(&buffer_manager->lock)) {
                abort();
        }
        LIST_REMOVE(used_buffer, next);
        LIST_INSERT_HEAD(&buffer_manager->free_buffer_head, used_buffer, next);
        if (pthread_mutex_unlock(&buffer_manager->lock)) {
                abort();
        }

        return 0;
}

static int
queue_create(
    struct queue **queue,
    enum queue_mode mode,
    int max_queue_size) {
        struct queue *new = NULL;
        struct buffer_manager *new_buffer_manager = NULL;

        *queue = NULL;
        new = malloc(sizeof(struct queue));
        if (new == NULL) {
                goto fail;
        }
        memset(new, 0, sizeof(struct queue));
        if (buffer_manager_create(
            &new_buffer_manager,
            THREAD_BUFFER_MANAGER_GLOW_OFF,
            sizeof(struct queue_data),
            max_queue_size)) {
                goto fail;
        }
        if (sem_init(&new->dequeue_sem, 0, 0) != 0) {
                goto fail;
        }
        if (sem_init(&new->enqueue_sem, 0, 0) != 0) {
                goto fail;
        }
        if (pthread_mutex_init(&new->lock, NULL) != 0) {
                goto fail;
        }
        TAILQ_INIT(&new->queue_data_head);
        new->buffer_manager = new_buffer_manager;
        new->mode = mode;
        new->max_queue_size = max_queue_size;
        *queue = new;

        return 0;

fail:
        if (new_buffer_manager) {
                buffer_manager_destroy(new_buffer_manager);
        }
        free(new);

        return 1;
}

static int
queue_destroy(struct queue *queue) {
        struct queue_data *queue_data, *queue_data_next;

        queue_data = TAILQ_FIRST(&queue->queue_data_head);
        while (queue_data) {
                queue_data_next = TAILQ_NEXT(queue_data, next);
                TAILQ_REMOVE(&queue->queue_data_head, queue_data, next);
                queue_data->free_cb(queue_data->free_args, queue_data->data);
                buffer_manager_put(
                    queue->buffer_manager,
                    (void *)queue_data);
                queue_data = queue_data_next;
        }
        buffer_manager_destroy(queue->buffer_manager);
        pthread_mutex_destroy(&queue->lock);
        free(queue);

        return 0;
}

static int
queue_enqueue(
    struct queue *queue,
    void *data,
    void (*free_cb)(void *free_args, void *data),
    void *free_args) {
        struct queue_data *queue_data;

        if (pthread_mutex_lock(&queue->lock) != 0) {
                abort();
        }
        while (1) {
                if (buffer_manager_get(queue->buffer_manager, (void *)&queue_data)) {
                        if (errno == ENOBUFS) {
                                queue->queue_full_count++;
                                if (queue->mode & THREAD_QUEUE_MODE_NOWAIT) {
                                        errno = ENOBUFS;
                                        if (pthread_mutex_unlock(&queue->lock) != 0) {
                                                abort();
                                        }
                                        return 1;
                                }
                                if (pthread_mutex_unlock(&queue->lock) != 0) {
                                        abort();
                                }
                                if (sem_wait(&queue->enqueue_sem)) {
                                        abort();
                                }
                                if (pthread_mutex_lock(&queue->lock) != 0) {
                                        abort();
                                }
                                continue;
                        } else {
                                abort();
                        }
                }
                break;
        }
        queue_data->data = data;
        queue_data->free_cb = free_cb;
        queue_data->free_args = free_args;
        TAILQ_INSERT_TAIL(&queue->queue_data_head, queue_data, next);
        queue->current_queue_size++;
        if (pthread_mutex_unlock(&queue->lock) != 0) {
                abort();
        }
        /* dequeue_dataのセマフォにpost */
        if (sem_post(&queue->dequeue_sem)) {
                abort();
        }

        return 0;
}

static int
queue_dequeue(
    struct queue *queue,
    void **data) {
        struct queue_data *queue_data;

        if (queue->mode & THREAD_QUEUE_MODE_WAIT) {
                if (sem_wait(&queue->dequeue_sem)) {
                        abort();
                }
        }
        if (pthread_mutex_lock(&queue->lock) != 0) {
                abort();
        }
        queue_data = TAILQ_FIRST(&queue->queue_data_head);
        if (queue_data == NULL) {
                if (queue->mode & THREAD_QUEUE_MODE_WAIT) {
                        if (queue->dequeue_cancel) {
                                queue->dequeue_cancel -= 1;
                        } else {
                                abort();
                        }
                }
                *data = NULL;
        } else {
                TAILQ_REMOVE(&queue->queue_data_head, queue_data, next);
                queue->current_queue_size--;
                *data = queue_data->data;
                if (buffer_manager_put(
                    queue->buffer_manager,
                    queue_data)) {
                        abort();
                }
                if (queue->mode & THREAD_QUEUE_MODE_WAIT ||
                    queue->current_queue_size == queue->max_queue_size -1) {
                        if (sem_post(&queue->enqueue_sem)) {
                                abort();
                        }
                }
        }
        if (pthread_mutex_unlock(&queue->lock) != 0) {
                abort();
        }

        return 0;
}

static int
queue_dequeue_cancel(
    struct queue *queue)
{
        if (pthread_mutex_lock(&queue->lock) != 0) {
                abort();
        }
        queue->dequeue_cancel += 1;
        if (pthread_mutex_unlock(&queue->lock) != 0) {
                abort();
        }
        sem_post(&queue->dequeue_sem);

        return 0;
}

static void
get_task(struct buffer_manager *task_buffer, struct client_task **task) {
        struct client_task *t;

        buffer_manager_get(task_buffer, (void *)&t);
        *task = t;
}

static void
relase_task(void *args, void *data) {
        struct buffer_manager *task_buffer;
        struct client_task *task;

        task_buffer = args;
        task = data;
        //printf("release %d-%p\n", task->sd, task->read_write);
        buffer_manager_put(task_buffer, data);
}

static void
client_data(int sd, short ev, void *args) {
        struct client_task *task;

        //printf("read write evnt\n");
        task = args;
        if (ev == EV_READ) {
                task->type = TASK_TYPE_READ_DATA;
        } else if (ev == EV_WRITE) {
                task->type = TASK_TYPE_WRITE_DATA;
        }
        queue_enqueue(task->task_queue, task, relase_task, task->task_buffer);
}

static void
io_thread_on_connect(int sd, short ev, void *args) {
        struct io_thread *io_thread;
        struct client_task *task;

        //printf("on connect\n");
        io_thread = args;
        get_task(io_thread->task_buffer, &task);
        task->type = TASK_TYPE_ACCEPT;
        task->task_buffer = io_thread->task_buffer;
        task->task_queue = io_thread->task_queue;
        task->io_thread = io_thread;
        queue_enqueue(task->task_queue, task, relase_task, task->task_buffer);
}

static void
io_thread_notify(int fd, short ev, void *args) {
        struct io_thread *io_thread;
        char data;

        printf("on notify\n");
        read(fd, &data, 1);
        io_thread = args;
        event_del(io_thread->listen);
        event_del(io_thread->notify);
}

static int
io_thread_stop(int notify_write) {
        printf("send io thread stop\n");
        char data;
        write(notify_write, &data, 1);
}

static void *
io_thread_start(void *args) {
        int s = -1;
        struct addrinfo  hints, *res=NULL;
        struct io_thread *io_thread;
        int reuse_addr = 1;

        printf("start io thread\n");
        io_thread = args;
        io_thread->event_base = event_base_new();
        //event_base_priority_init(io_thread->event_base, 1);

        io_thread->notify = event_new(io_thread->event_base, io_thread->notify_read, EV_READ | EV_PERSIST, io_thread_notify, io_thread);
        event_add(io_thread->notify, NULL);

        memset(&hints, 0, sizeof(hints));
        hints.ai_family = PF_UNSPEC;
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV | AI_PASSIVE;
        if (getaddrinfo("0.0.0.0", "10080", &hints, &res) != 0) {
                goto on_fail;
        }
        if ((s = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) {
                goto on_fail;
        }
        if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr))) {
                goto on_fail;
        }
        if (bind(s, res->ai_addr, res->ai_addrlen) < 0) {
                goto on_fail;
        }
        if (listen(s, THREAD_POOL_NUM) < 0) {
                goto on_fail;
        }
        freeaddrinfo(res);
        res = NULL;
        io_thread->listen_sd = s;
        io_thread->listen = event_new(io_thread->event_base, io_thread->listen_sd, EV_READ, io_thread_on_connect, io_thread);
        //event_priority_set(io_thread->listen, 0);
        event_add(io_thread->listen, NULL);

        event_base_dispatch(io_thread->event_base);

        close(s);
        event_free(io_thread->listen);
        event_free(io_thread->notify);
        event_base_free(io_thread->event_base);

        printf("io thread finish\n");

        return NULL;

on_fail:
        if (res != NULL)
                freeaddrinfo(res);
        if (s >= 0)
                close(s);

        printf("error in io thread\n");

        return NULL;
}

static void
pool_thread_stop(struct pool_thread *pool_thread) {
        pool_thread->run = 0;
        queue_dequeue_cancel(pool_thread->task_queue);
}

static void *
pool_thread_start(void *args) {
        struct pool_thread *pool_thread;
        struct client_task *task;
        struct io_thread *io_thread;
        char type;
        int len;
        char response[4096];

        printf("start pool thread\n");
        pool_thread = args;
        while (pool_thread->run) {
                queue_dequeue(pool_thread->task_queue, (void *)&task);
                if (task == NULL) {
                        break;
                }
                switch (task->type) {
                case TASK_TYPE_ACCEPT:
                        io_thread = task->io_thread;
                        task->socklen = sizeof(task->remote);
                        task->sd = accept(io_thread->listen_sd, (struct sockaddr *)&task->remote, &task->socklen);
                        event_assign(io_thread->listen, io_thread->event_base, io_thread->listen_sd, EV_READ, io_thread_on_connect, io_thread);
                        //event_priority_set(io_thread->listen, 0);
                        event_add(io_thread->listen, NULL);
                        event_assign(&task->read_write, io_thread->event_base, task->sd, EV_READ, client_data, task);
                        event_add(&task->read_write, NULL);
                        //printf("accept %d-%p\n", task->sd, task->read_write);
                        break;
                case TASK_TYPE_READ_DATA:
                        //printf("read %d-%p\n", task->sd, task->read_write);
                        io_thread = task->io_thread;
                        if ((len = read(task->sd, task->buffer, sizeof(task->buffer) - 1)) <= 0) {
                                //printf("failed in read\n");
                                close(task->sd);
                                relase_task(task->task_buffer , task);
                                break;
                        }
                        task->buffer[len] = '\0';
                        event_assign(&task->read_write, io_thread->event_base, task->sd, EV_WRITE, client_data, task);
                        //event_priority_set(io_thread->listen, 1);
                        event_add(&task->read_write, NULL);
                        break;
                case TASK_TYPE_WRITE_DATA:
                        //printf("write %d-%p\n", task->sd, task->read_write);
                        len = snprintf(response, sizeof(response), RESPONSE_HEADER RESPONSE_BODY, sizeof(RESPONSE_BODY));
                        //printf("res - %s\n", response);
                        write(task->sd, response, len);
                        close(task->sd);
                        relase_task(task->task_buffer , task);
                        break;
                default:
                        abort();
                }
        }
        printf("finish pool thread\n");

        return NULL;
}

static void
terminate(int fd, short event, void *args)
{
        struct aio *aio;
        int i;

        printf("catch signal\n");
        aio = args;
        for (i = 0; i < sizeof(aio->pool_thread)/sizeof(aio->pool_thread[0]); i++) {
                pool_thread_stop(&aio->pool_thread[i]);
        }
        io_thread_stop(aio->notify_write);
        evsignal_del(aio->sigterm);
        evsignal_del(aio->sigint);
}

int
main(int argc, char *argv[])
{
        int i;
        struct aio aio;
        struct buffer_manager *task_buffer;
        struct queue *task_queue;
        sigset_t sig_set;
        int pipe_fd[2];

        evthread_use_pthreads();
        memset(&aio, 0, sizeof(aio));
        buffer_manager_create(
            &task_buffer,
            THREAD_BUFFER_MANAGER_GLOW_ON,
            sizeof(struct client_task),
            10000);
        queue_create(
            &task_queue, THREAD_QUEUE_MODE_WAIT, 10000);
        pipe(pipe_fd);
        aio.notify_write = pipe_fd[1];
        aio.event_base = event_base_new();

        aio.sigterm = evsignal_new(aio.event_base, SIGTERM, terminate, &aio);
        evsignal_add(aio.sigterm, NULL);
        aio.sigint = evsignal_new(aio.event_base, SIGINT, terminate, &aio);
        evsignal_add(aio.sigint, NULL);

        sigfillset(&sig_set);
        pthread_sigmask(SIG_BLOCK, &sig_set, NULL);

        aio.io_thread.task_buffer = task_buffer;
        aio.io_thread.task_queue = task_queue;
        aio.io_thread.notify_read = pipe_fd[0];
        if (pthread_create(&aio.io_thread.io_thread, NULL, io_thread_start, &aio.io_thread)) {
                return 1;
        }
        for (i = 0; i < sizeof(aio.pool_thread)/sizeof(aio.pool_thread[0]); i++) {
                aio.pool_thread[i].task_queue = task_queue;
                aio.pool_thread[i].run = 1;
                if (pthread_create(&aio.pool_thread[i].pool_thread, NULL, pool_thread_start, &aio.pool_thread[i])) {
                        return 1;
                }
        }

        pthread_sigmask(SIG_UNBLOCK, &sig_set, NULL);

        event_base_dispatch(aio.event_base);

        pthread_join(aio.io_thread.io_thread, NULL);
        for (i = 0; i < sizeof(aio.pool_thread)/sizeof(aio.pool_thread[0]); i++) {
                pthread_join(aio.pool_thread[i].pool_thread, NULL);
        }

        queue_destroy(task_queue);
        buffer_manager_destroy(task_buffer);
        event_free(aio.sigterm);
        event_free(aio.sigint);
        event_base_free(aio.event_base);

        return 0;
}