aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2014-05-25 14:05:44 +0300
committerOskari Timperi <oskari.timperi@iki.fi>2014-05-25 14:05:44 +0300
commit626e12cdce18fdafe48a27bb51d98267dabd8984 (patch)
tree72781edd2f838152353b75f9ab8419dfc469a5be
parent0ae56484d334114120c59c0f4593c212b8bf2d08 (diff)
downloadlibuvh-626e12cdce18fdafe48a27bb51d98267dabd8984.tar.gz
libuvh-626e12cdce18fdafe48a27bb51d98267dabd8984.zip
add uvh_request_stream() to allow streaming data to client
-rw-r--r--src/uvh.c105
-rw-r--r--src/uvh.h5
2 files changed, 100 insertions, 10 deletions
diff --git a/src/uvh.c b/src/uvh.c
index 4366967..6a3e27f 100644
--- a/src/uvh.c
+++ b/src/uvh.c
@@ -80,6 +80,9 @@ struct uvh_request_private
int send_status;
sds send_headers;
sds send_body;
+ int streaming;
+ uvh_stream_cb stream_cb;
+ void *stream_userdata;
};
struct uvh_server *uvh_server_init(uv_loop_t *loop, void *data,
@@ -480,13 +483,16 @@ struct uvh_write_request
static void after_request_write(uv_write_t *req, int status)
{
+ LOG_DEBUG("%s", __FUNCTION__);
struct uvh_write_request *wreq = container_of(req, struct uvh_write_request,
wreq);
+ (void) status;
sdsfree((sds) wreq->buf.base);
free(wreq);
}
-static void uvh_request_write_sds(struct uvh_request *req, sds data)
+static void uvh_request_write_sds(struct uvh_request *req, sds data,
+ uv_write_cb cb)
{
struct uvh_request_private *p = container_of(req,
struct uvh_request_private, req);
@@ -498,8 +504,7 @@ static void uvh_request_write_sds(struct uvh_request *req, sds data)
wreq->req = p;
- uv_write(&wreq->wreq, (uv_stream_t *) &p->stream, &wreq->buf, 1,
- &after_request_write);
+ uv_write(&wreq->wreq, (uv_stream_t *) &p->stream, &wreq->buf, 1, cb);
}
void uvh_request_write(struct uvh_request *req,
@@ -582,11 +587,14 @@ void uvh_request_end(struct uvh_request *req)
uvh_request_write_sds(req, sdscatprintf(sdsempty(),
"%s %d %s\r\n", p->req.version, p->send_status,
- http_status_code_str(p->send_status)));
+ http_status_code_str(p->send_status)), &after_request_write);
- sds content_len = sdscatprintf(sdsempty(), "%d", (int)sdslen(p->send_body));
- uvh_request_write_header(req, "Content-Length", content_len);
- sdsfree(content_len);
+ if (!p->streaming)
+ {
+ sds content_len = sdscatprintf(sdsempty(), "%d", (int)sdslen(p->send_body));
+ uvh_request_write_header(req, "Content-Length", content_len);
+ sdsfree(content_len);
+ }
LOG_DEBUG("keepalive: %d", p->keepalive);
@@ -596,7 +604,84 @@ void uvh_request_end(struct uvh_request *req)
// uv_close at some point?
}
- uvh_request_write_sds(req, p->send_headers);
- uvh_request_write_sds(req, sdsnew("\r\n"));
- uvh_request_write_sds(req, p->send_body);
+ uvh_request_write_sds(req, p->send_headers, &after_request_write);
+ uvh_request_write_sds(req, sdsnew("\r\n"), &after_request_write);
+
+ if (!p->streaming)
+ uvh_request_write_sds(req, p->send_body, &after_request_write);
+}
+
+static void uvh_request_write_chunk(struct uvh_request *req, sds chunk);
+
+static void after_chunk_write(uv_write_t *req, int status)
+{
+ LOG_DEBUG("%s", __FUNCTION__);
+
+ (void)status;
+
+ struct uvh_write_request *wreq = container_of(req, struct uvh_write_request,
+ wreq);
+
+ struct uvh_request_private *p = wreq->req;
+
+ char *chunk;
+
+ int chunklen = p->stream_cb(&chunk, p->stream_userdata);
+
+ if (chunklen == 0)
+ {
+ uvh_request_write_chunk(&p->req, sdsempty());
+ }
+ else
+ {
+ uvh_request_write_chunk(&p->req, sdsnewlen(chunk, chunklen));
+ free(chunk);
+ }
+}
+
+static void after_last_chunk_write(uv_write_t *req, int status)
+{
+ LOG_DEBUG("%s", __FUNCTION__);
+ (void)req;
+ (void)status;
+}
+
+static void uvh_request_write_chunk(struct uvh_request *req, sds chunk)
+{
+ LOG_DEBUG("%s", __FUNCTION__);
+
+ sds chunklen = sdscatprintf(sdsempty(), "%X\r\n",
+ (unsigned int) sdslen(chunk));
+
+ uvh_request_write_sds(req, chunklen, NULL);
+
+ if (sdslen(chunk) == 0)
+ {
+ uvh_request_write_sds(req, sdsnew("\r\n"), &after_last_chunk_write);
+ }
+ else
+ {
+ uvh_request_write_sds(req, chunk, NULL);
+ uvh_request_write_sds(req, sdsnew("\r\n"), &after_chunk_write);
+ }
+}
+
+void uvh_request_stream(struct uvh_request *req, uvh_stream_cb callback,
+ void *data)
+{
+ struct uvh_request_private *p = container_of(req,
+ struct uvh_request_private, req);
+
+ p->streaming = 1;
+ p->stream_cb = callback;
+ p->stream_userdata = data;
+
+ uvh_request_write_header(req, "Transfer-Encoding", "chunked");
+
+ uvh_request_end(req);
+
+ char *chunk;
+ int chunklen = callback(&chunk, data);
+ uvh_request_write_chunk(req, sdsnewlen(chunk, chunklen));
+ free(chunk);
}
diff --git a/src/uvh.h b/src/uvh.h
index 76754a8..80554aa 100644
--- a/src/uvh.h
+++ b/src/uvh.h
@@ -130,4 +130,9 @@ const char *uvh_request_get_header(struct uvh_request *req,
void uvh_request_end(struct uvh_request *req);
+typedef int (*uvh_stream_cb)(char **buffer, void *data);
+
+void uvh_request_stream(struct uvh_request *req, uvh_stream_cb callback,
+ void *data);
+
#endif /* UVH_H */