diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2014-05-29 15:37:29 +0300 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2014-05-29 15:37:29 +0300 |
| commit | 84241644c5a6e82f726daa4a9def2a480a0d5653 (patch) | |
| tree | 8a3aaf9e16c0f1c62483da62cf3646169c2e5e58 | |
| parent | 78156ded91e69b95493949a9ba4895e193adebad (diff) | |
| download | libuvh-84241644c5a6e82f726daa4a9def2a480a0d5653.tar.gz libuvh-84241644c5a6e82f726daa4a9def2a480a0d5653.zip | |
refactor refactor refactor
- add struct uvh_connection to make things simpler
- there can be multiple requests per connection!
- todo: pipelining support
- use SLIST from sys/queue.h to keep account of connections and requests
- refactoring here and there
| -rw-r--r-- | src/uvh.c | 456 |
1 files changed, 243 insertions, 213 deletions
@@ -2,6 +2,8 @@ #include "sds.h" #include "http_parser.h" +#include <sys/queue.h> + #ifndef offsetof #ifdef __GNUC__ #define offsetof(type, member) __builtin_offsetof (type, member) @@ -59,26 +61,35 @@ static int on_message_complete(http_parser *parser); static void uvh_request_write_chunk(struct uvh_request *req, sds chunk); +struct uvh_request_private; +struct uvh_connection; + struct uvh_server { + uv_tcp_t stream; struct sockaddr_storage addr; socklen_t addr_len; uv_loop_t *loop; struct http_parser_settings http_parser_settings; - uv_tcp_t stream; char stop; uvh_request_handler_cb request_handler; void *userdata; + SLIST_HEAD(connection_list, uvh_connection) connections; +}; + +struct uvh_connection +{ + uv_tcp_t stream; + struct uvh_server *server; + struct http_parser parser; + SLIST_ENTRY(uvh_connection) siblings; + SLIST_HEAD(request_list, uvh_request_private) requests; }; struct uvh_request_private { struct uvh_request req; - struct http_parser parser; - uv_tcp_t stream; - sds header_name; - sds header_value; - int header_state; + struct uvh_connection *connection; char keepalive; int send_status; sds send_headers; @@ -86,8 +97,15 @@ struct uvh_request_private int streaming; uvh_stream_cb stream_cb; void *stream_userdata; + SLIST_ENTRY(uvh_request_private) siblings; }; +static struct uvh_request_private *uvh_request_new(struct uvh_connection *conn); +static void uvh_request_free(struct uvh_request_private *req); + +static struct uvh_connection *uvh_connection_new(struct uvh_server *server); +static void uvh_connection_free(struct uvh_connection *conn); + UVH_EXTERN struct uvh_server *uvh_server_init(uv_loop_t *loop, void *data, uvh_request_handler_cb request_handler) { @@ -103,6 +121,8 @@ UVH_EXTERN struct uvh_server *uvh_server_init(uv_loop_t *loop, void *data, server->userdata = data; server->request_handler = request_handler; + SLIST_INIT(&server->connections); + rc = uv_tcp_init(loop, &server->stream); if (rc < 0) @@ -112,7 +132,6 @@ UVH_EXTERN struct uvh_server *uvh_server_init(uv_loop_t *loop, void *data, server->http_parser_settings.on_message_begin = on_message_begin; server->http_parser_settings.on_url = on_url; - // server->http_parser_settings.on_status = on_status; server->http_parser_settings.on_header_field = on_header_field; server->http_parser_settings.on_header_value = on_header_value; server->http_parser_settings.on_headers_complete = on_headers_complete; @@ -157,7 +176,15 @@ UVH_EXTERN int uvh_server_listen(struct uvh_server *server, const char *address, static void on_server_close(uv_handle_t *handle) { + struct uvh_server *server = (struct uvh_server *) handle; + struct uvh_connection *conn; + LOG_DEBUG("%s", __FUNCTION__); + + SLIST_FOREACH(conn, &server->connections, siblings) + { + uv_close((uv_handle_t *) &conn->stream, &close_cb); + } } UVH_EXTERN void uvh_server_stop(struct uvh_server *server) @@ -166,74 +193,11 @@ UVH_EXTERN void uvh_server_stop(struct uvh_server *server) uv_close((uv_handle_t *) &server->stream, &on_server_close); } -void request_init(struct uvh_request_private *req, struct uvh_server *server) -{ - if (req->req.header_count > 0) - { - int i; - for (i = 0; i < req->req.header_count; ++i) - { - sdsfree((sds) req->req.headers[i].name); - sdsfree((sds) req->req.headers[i].value); - } - - if (req->req.method) - sdsfree((sds) req->req.method); - - if (req->req.version) - sdsfree((sds) req->req.version); - - if (req->req.url.full) - sdsfree((sds) req->req.url.full); - - if (req->req.url.schema) - sdsfree((sds) req->req.url.schema); - - if (req->req.url.host) - sdsfree((sds) req->req.url.host); - - if (req->req.url.port) - sdsfree((sds) req->req.url.port); - - if (req->req.url.path) - sdsfree((sds) req->req.url.path); - - if (req->req.url.query) - sdsfree((sds) req->req.url.query); - - if (req->req.url.fragment) - sdsfree((sds) req->req.url.fragment); - - if (req->req.url.userinfo) - sdsfree((sds) req->req.url.userinfo); - - if (req->req.content) - sdsfree((sds) req->req.content); - } - - memset(&req->req, 0, sizeof(req->req)); - - req->req.server = server; - req->req.data = server->userdata; - - req->header_state = 0; - - req->send_body = sdsempty(); - req->send_headers = sdsempty(); - req->send_status = HTTP_OK; - - http_parser_init(&req->parser, HTTP_REQUEST); - req->parser.data = req; - - req->streaming = 0; - req->stream_cb = NULL; - req->stream_userdata = NULL; -} - static void on_connection(uv_stream_t *stream, int status) { - struct uvh_server *server = container_of((uv_tcp_t *) stream, - struct uvh_server, stream); + struct uvh_server *server = (struct uvh_server *) stream; + struct uvh_connection *connection; + uv_tcp_t *client; LOG_DEBUG("%s", __FUNCTION__); @@ -246,44 +210,37 @@ static void on_connection(uv_stream_t *stream, int status) if (server->stop) { LOG_WARNING("on_connection: stop bit set"); - uv_tcp_t *client = calloc(1, sizeof(*client)); - uv_tcp_init(server->loop, client); - uv_accept(stream, (uv_stream_t *) client); - uv_close((uv_handle_t *) client, NULL); - return; + goto close_conn; } - struct uvh_request_private *req = calloc(1, sizeof(*req)); - request_init(req, server); + connection = uvh_connection_new(server); - if (uv_tcp_init(server->loop, &req->stream)) - { - LOG_WARNING("failed to initialize uv_tcp_t"); - goto error; - } + if (!connection) + goto close_conn; - req->stream.data = req; + SLIST_INSERT_HEAD(&server->connections, connection, siblings); - if (uv_accept(stream, (uv_stream_t *) &req->stream) == 0) + if (uv_accept(stream, (uv_stream_t *) &connection->stream) == 0) { - uv_read_start((uv_stream_t *) &req->stream, alloc_cb, - read_cb); - return; + LOG_DEBUG("starting read on connection %p", connection); + uv_read_start((uv_stream_t *) &connection->stream, alloc_cb, read_cb); } else { - uv_close((uv_handle_t *) &req->stream, NULL); + // TODO: free the connection + uv_close((uv_handle_t *) &connection->stream, NULL); LOG_WARNING("failed to accept"); } -error: + return; - if (req) - { - sdsfree(req->send_body); - sdsfree(req->send_headers); - free(req); - } +close_conn: + + client = calloc(1, sizeof(*client)); + uv_tcp_init(server->loop, client); + uv_accept(stream, (uv_stream_t *) client); + // TODO: free the client + uv_close((uv_handle_t *) client, NULL); } static uv_buf_t alloc_cb(uv_handle_t *handle, size_t size) @@ -294,12 +251,9 @@ static uv_buf_t alloc_cb(uv_handle_t *handle, size_t size) static void read_cb(uv_stream_t *stream, ssize_t nread, uv_buf_t buf) { - struct uvh_request_private *req; - struct uvh_server *server; - int nparsed; - - req = (struct uvh_request_private *) stream->data; - server = req->req.server; + struct uvh_connection *connection = (struct uvh_connection *) stream; + struct uvh_server *server = connection->server; + size_t nparsed; LOG_DEBUG("read_cb: nread: %d, buf.len: %d", (int)nread, (int)buf.len); @@ -322,17 +276,21 @@ static void read_cb(uv_stream_t *stream, ssize_t nread, uv_buf_t buf) } } - nparsed = http_parser_execute(&req->parser, + nparsed = http_parser_execute(&connection->parser, &server->http_parser_settings, buf.base, nread); - LOG_DEBUG("nparsed:%d", nparsed); + LOG_DEBUG("nparsed:%d", (int) nparsed); - if (nparsed != nread) + if ((ssize_t) nparsed != nread) { LOG_ERROR("http parse error, closing connection"); uv_close((uv_handle_t *) stream, &close_cb); } + else if (nread == 0) + { + uv_close((uv_handle_t *) stream, &close_cb); + } out: @@ -341,135 +299,114 @@ out: static void close_cb(uv_handle_t *handle) { - LOG_DEBUG("%s", __FUNCTION__); - - struct uvh_request_private *p; + struct uvh_connection *connection = (struct uvh_connection *) handle; + struct uvh_server *server = connection->server; - p = (struct uvh_request_private *) handle->data; + LOG_DEBUG("%s", __FUNCTION__); - sdsfree((sds) p->req.method); - sdsfree((sds) p->req.version); - sdsfree((sds) p->req.url.full); - sdsfree((sds) p->req.url.schema); - sdsfree((sds) p->req.url.host); - sdsfree((sds) p->req.url.port); - sdsfree((sds) p->req.url.path); - sdsfree((sds) p->req.url.query); - sdsfree((sds) p->req.url.fragment); - sdsfree((sds) p->req.url.userinfo); - sdsfree((sds) p->req.content); - sdsfree((sds) p->send_body); - sdsfree((sds) p->send_headers); + SLIST_REMOVE(&server->connections, connection, uvh_connection, siblings); - free(p); + uvh_connection_free(connection); } static int on_message_begin(http_parser *parser) { - struct uvh_request_private *priv; - priv = (struct uvh_request_private *) parser->data; - priv->req.content = (const char *) sdsempty(); + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *req; + + LOG_DEBUG("%s", __FUNCTION__); + + req = uvh_request_new(connection); + + SLIST_INSERT_HEAD(&connection->requests, req, siblings); + return 0; } static int on_url(http_parser *parser, const char *at, size_t len) { - struct uvh_request_private *priv; - priv = (struct uvh_request_private *) parser->data; + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *req = SLIST_FIRST(&connection->requests); - LOG_DEBUG("on_url: <%.*s>", (int) len, at); + LOG_DEBUG("%s", __FUNCTION__); - if (!priv->req.url.full) + if (!req->req.url.full) { - priv->req.url.full = sdsnewlen(at, len); + req->req.url.full = sdsnewlen(at, len); } else { - priv->req.url.full = sdscatlen((sds) priv->req.url.full, - at, len); + req->req.url.full = sdscatlen((sds) req->req.url.full, at, len); } return 0; } -// static int on_status(http_parser *parser, const char *at, size_t len) -// { -// LOG_DEBUG("on_status: <%.*s>", (int) len, at); -// return 0; -// } +#define HEADER_COUNT req->req.header_count +#define CURRENT_HEADER req->req.headers[HEADER_COUNT] -static int on_header_field(http_parser *parser, const char *at, - size_t len) +static int on_header_field(http_parser *parser, const char *at, size_t len) { - struct uvh_request_private *priv; - struct uvh_request *req; + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *req = SLIST_FIRST(&connection->requests); + + LOG_DEBUG("%s", __FUNCTION__); - priv = (struct uvh_request_private *) parser->data; - req = &priv->req; +start: - if (priv->header_state == 0) - { - priv->header_name = sdsnewlen(at, len); - } - else if (priv->header_state == 1) + if (!CURRENT_HEADER.name) { - priv->header_name = sdscatlen(priv->header_name, at, len); + CURRENT_HEADER.name = sdsnewlen(at, len); } - else if (priv->header_state == 2) + else { - req->headers[req->header_count].name = priv->header_name; - req->headers[req->header_count].value = priv->header_value; - req->header_count += 1; - - priv->header_name = sdsnewlen(at, len); - priv->header_value = NULL; + if (!CURRENT_HEADER.value) + { + CURRENT_HEADER.name = sdscatlen((sds) CURRENT_HEADER.name, at, len); + } + else + { + ++HEADER_COUNT; + goto start; + } } - priv->header_state = 1; - return 0; } -static int on_header_value(http_parser *parser, const char *at, - size_t len) +static int on_header_value(http_parser *parser, const char *at, size_t len) { - struct uvh_request_private *priv; + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *req = SLIST_FIRST(&connection->requests); - priv = (struct uvh_request_private *) parser->data; + LOG_DEBUG("%s", __FUNCTION__); - if (priv->header_state == 1) + if (!CURRENT_HEADER.value) { - priv->header_value = sdsnewlen(at, len); + CURRENT_HEADER.value = sdsnewlen(at, len); } - else if (priv->header_state == 2) + else { - priv->header_value = sdscatlen(priv->header_value, at, len); + CURRENT_HEADER.value = sdscatlen((sds) CURRENT_HEADER.value, at, len); } - priv->header_state = 2; - return 0; } +#undef CURRENT_HEADER +#undef HEADER_COUNT + static int on_headers_complete(http_parser *parser) { - struct uvh_request_private *priv; - struct uvh_request *req; + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *priv = SLIST_FIRST(&connection->requests); + struct uvh_request *req = &priv->req; struct http_parser_url url; - const char *full; - - priv = (struct uvh_request_private *) parser->data; - req = &priv->req; - full = req->url.full; + const char *full = req->url.full; - if (priv->header_state == 2) - { - req->headers[req->header_count].name = priv->header_name; - req->headers[req->header_count].value = priv->header_value; - req->header_count += 1; - } + LOG_DEBUG("%s", __FUNCTION__); - LOG_DEBUG("on_headers_complete"); + ++req->header_count; http_parser_parse_url(req->url.full, sdslen((sds) req->url.full), 1, &url); @@ -499,44 +436,136 @@ static int on_headers_complete(http_parser *parser) static int on_body(http_parser *parser, const char *at, size_t len) { - struct uvh_request_private *priv; - priv = (struct uvh_request_private *) parser->data; - priv->req.content = (const char *) sdscatlen((sds) priv->req.content, - at, len); + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *req = SLIST_FIRST(&connection->requests); + + LOG_DEBUG("%s", __FUNCTION__); + + if (!req->req.content) + { + req->req.content = sdsnewlen(at, len); + } + else + { + req->req.content = sdscatlen((sds) req->req.content, at, len); + } + return 0; } static int on_message_complete(http_parser *parser) { - struct uvh_request_private *priv; - - priv = (struct uvh_request_private *) parser->data; + struct uvh_connection *connection = (struct uvh_connection *) parser->data; + struct uvh_request_private *req = SLIST_FIRST(&connection->requests); - LOG_DEBUG("on_message_complete"); + LOG_DEBUG("%s", __FUNCTION__); - priv->keepalive = http_should_keep_alive(parser); + req->keepalive = http_should_keep_alive(parser); - if (priv->req.content) - priv->req.content_length = sdslen((sds) priv->req.content); + if (req->req.content) + req->req.content_length = sdslen((sds) req->req.content); else - priv->req.content_length = 0; + req->req.content_length = 0; - priv->req.method = sdsnew(http_method_str(parser->method)); + req->req.method = sdsnew(http_method_str(parser->method)); - priv->req.version = sdsempty(); - priv->req.version = sdscatprintf((sds) priv->req.version, + req->req.version = sdsempty(); + req->req.version = sdscatprintf((sds) req->req.version, "HTTP/%d.%d", parser->http_major, parser->http_minor); - if (priv->req.server->request_handler) - priv->req.server->request_handler(&priv->req); + if (req->req.server->request_handler) + req->req.server->request_handler(&req->req); return 0; } +static struct uvh_request_private *uvh_request_new(struct uvh_connection *conn) +{ + struct uvh_request_private *req = calloc(1, sizeof(*req)); + req->connection = conn; + req->req.server = conn->server; + req->req.data = conn->server->userdata; + req->send_body = sdsempty(); + req->send_headers = sdsempty(); + req->send_status = HTTP_OK; + return req; +} + +static void uvh_request_free(struct uvh_request_private *req) +{ + int i; + + LOG_DEBUG("%s", __FUNCTION__); + + for (i = 0; i < req->req.header_count; ++i) + { + sdsfree((sds) req->req.headers[i].name); + sdsfree((sds) req->req.headers[i].value); + } + + sdsfree((sds) req->req.method); + sdsfree((sds) req->req.version); + + sdsfree((sds) req->req.url.full); + sdsfree((sds) req->req.url.schema); + sdsfree((sds) req->req.url.host); + sdsfree((sds) req->req.url.port); + sdsfree((sds) req->req.url.path); + sdsfree((sds) req->req.url.query); + sdsfree((sds) req->req.url.fragment); + sdsfree((sds) req->req.url.userinfo); + + sdsfree((sds) req->req.content); + + // TODO: what about send_body, send_headers + + free(req); +} + +static struct uvh_connection *uvh_connection_new(struct uvh_server *server) +{ + struct uvh_connection *connection = calloc(1, sizeof(*connection)); + connection->server = server; + + http_parser_init(&connection->parser, HTTP_REQUEST); + connection->parser.data = connection; + + SLIST_INIT(&connection->requests); + + if (uv_tcp_init(server->loop, &connection->stream) < 0) + { + LOG_WARNING("failed to initialize uv_tcp_t"); + goto error; + } + + return connection; + +error: + + if (connection) + uvh_connection_free(connection); + + return NULL; +} + +static void uvh_connection_free(struct uvh_connection *conn) +{ + struct uvh_request_private *req; + + LOG_DEBUG("%s", __FUNCTION__); + + SLIST_FOREACH(req, &conn->requests, siblings) + { + uvh_request_free(req); + } + + free(conn); +} + struct uvh_write_request { - uv_buf_t buf; uv_write_t wreq; + uv_buf_t buf; struct uvh_request_private *req; }; @@ -549,8 +578,7 @@ static void uvh_write_request_free(struct uvh_write_request *req) static void after_request_write(uv_write_t *req, int status) { LOG_DEBUG("%s", __FUNCTION__); - struct uvh_write_request *wreq = container_of(req, struct uvh_write_request, - wreq); + struct uvh_write_request *wreq = (struct uvh_write_request *) req; (void) status; uvh_write_request_free(wreq); } @@ -568,7 +596,8 @@ static void uvh_request_write_sds(struct uvh_request *req, sds data, wreq->req = p; - uv_write(&wreq->wreq, (uv_stream_t *) &p->stream, &wreq->buf, 1, cb); + uv_write(&wreq->wreq, (uv_stream_t *) &p->connection->stream, &wreq->buf, + 1, cb); } UVH_EXTERN void uvh_request_write(struct uvh_request *req, @@ -680,7 +709,6 @@ UVH_EXTERN void uvh_request_end(struct uvh_request *req) if (!p->keepalive) { uvh_request_write_header(req, "Connection", "close"); - // uv_close at some point? } uvh_request_write_sds(req, p->send_headers, &after_request_write); @@ -691,9 +719,11 @@ UVH_EXTERN void uvh_request_end(struct uvh_request *req) else sdsfree(p->send_body); - if (p->keepalive && !p->streaming) + if (!p->streaming) { - request_init(p, req->server); + struct uvh_connection *connection = p->connection; + SLIST_REMOVE(&connection->requests, p, uvh_request_private, siblings); + uvh_request_free(p); } } @@ -706,7 +736,9 @@ static void after_last_chunk_write(uv_write_t *req, int status) (void)status; - request_init(wreq->req, wreq->req->req.server); + SLIST_REMOVE(&wreq->req->connection->requests, wreq->req, + uvh_request_private, siblings); + uvh_request_free(wreq->req); uvh_write_request_free(wreq); } @@ -761,9 +793,7 @@ static void uvh_request_write_chunk(struct uvh_request *req, sds chunk) } else { - if (chunk) - sdsfree(chunk); - + sdsfree(chunk); callback = &after_last_chunk_write; } |
