diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2017-03-06 21:32:49 +0200 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2017-03-06 21:32:49 +0200 |
| commit | a3dbe2dff7a56485eb4054368c2d2d1c4dee3437 (patch) | |
| tree | d3f0cd0ddd3e539c1ea3325184abe1dcbebcabd1 | |
| parent | a062d934aef40829d9559a8ca83147ea4c44108e (diff) | |
| download | mqtt-a3dbe2dff7a56485eb4054368c2d2d1c4dee3437.tar.gz mqtt-a3dbe2dff7a56485eb4054368c2d2d1c4dee3437.zip | |
Add forgotten files
| -rw-r--r-- | src/message.c | 11 | ||||
| -rw-r--r-- | src/message.h | 40 | ||||
| -rw-r--r-- | src/stringstream.c | 115 | ||||
| -rw-r--r-- | src/stringstream.h | 21 |
4 files changed, 187 insertions, 0 deletions
diff --git a/src/message.c b/src/message.c new file mode 100644 index 0000000..35d9c32 --- /dev/null +++ b/src/message.c @@ -0,0 +1,11 @@ +#include "message.h" +#include "stringstream.h" +#include "stream_mqtt.h" +#include "packet.h" + +void MqttMessageFree(MqttMessage *msg) +{ + bdestroy(msg->topic); + bdestroy(msg->payload); + free(msg); +} diff --git a/src/message.h b/src/message.h new file mode 100644 index 0000000..04a3d61 --- /dev/null +++ b/src/message.h @@ -0,0 +1,40 @@ +#ifndef MESSAGE_H +#define MESSAGE_H + +#include <stdint.h> + +#include "queue.h" +#include <bstrlib/bstrlib.h> + +enum MqttMessageState +{ + MqttMessageStateQueued, + MqttMessageStatePublish, + MqttMessageStateWaitPubAck, + MqttMessageStateWaitPubRec, + MqttMessageStateWaitPubComp, + MqttMessageStateWaitPubRel +}; + +typedef struct MqttMessage MqttMessage; + +struct MqttMessage +{ + int state; + int qos; + int retain; + int dup; + int padding; + uint16_t id; + int64_t timestamp; + bstring topic; + bstring payload; + TAILQ_ENTRY(MqttMessage) chain; +}; + +typedef struct MqttMessageList MqttMessageList; +TAILQ_HEAD(MqttMessageList, MqttMessage); + +void MqttMessageFree(MqttMessage *msg); + +#endif diff --git a/src/stringstream.c b/src/stringstream.c new file mode 100644 index 0000000..4353932 --- /dev/null +++ b/src/stringstream.c @@ -0,0 +1,115 @@ +#include "stringstream.h" + +#include <assert.h> + +static int StringStreamClose(Stream *base) +{ + StringStream *ss = (StringStream *) base; + bdestroy(ss->buffer); + ss->buffer = NULL; + return 0; +} + +static int64_t StringStreamRead(void *ptr, size_t size, Stream *stream) +{ + StringStream *ss = (StringStream *) stream; + int64_t available = blength(ss->buffer) - ss->pos; + void *bufptr; + + if (available <= 0) + { + return -1; + } + + if (size > (size_t) available) + size = available; + + /* Use a temp buffer pointer to make some warnings disappear when using + GCC */ + bufptr = bdataofs(ss->buffer, ss->pos); + memcpy(ptr, bufptr, size); + + ss->pos += size; + + return size; +} + +static int64_t StringStreamWrite(const void *ptr, size_t size, Stream *stream) +{ + StringStream *ss = (StringStream *) stream; + struct tagbstring buf; + if (ss->buffer->mlen <= 0) + return -1; + btfromblk(buf, ptr, size); + bsetstr(ss->buffer, ss->pos, &buf, '\0'); + ss->pos += size; + return size; +} + +int StringStreamSeek(Stream *base, int64_t offset, int whence) +{ + StringStream *ss = (StringStream *) base; + int64_t newpos = 0; + + if (whence == SEEK_SET) + { + newpos = offset; + } + else if (whence == SEEK_CUR) + { + newpos = ss->pos + offset; + } + else if (whence == SEEK_END) + { + newpos = blength(ss->buffer) - offset; + } + else + { + return -1; + } + + if (newpos > blength(ss->buffer)) + return -1; + + if (newpos < 0) + return -1; + + ss->pos = newpos; + + return 0; +} + +int64_t StringStreamTell(Stream *base) +{ + StringStream *ss = (StringStream *) base; + return ss->pos; +} + +static const StreamOps StringStreamOps = +{ + StringStreamRead, + StringStreamWrite, + StringStreamClose, + StringStreamSeek, + StringStreamTell +}; + +int StringStreamInit(StringStream *stream) +{ + assert(stream != NULL); + memset(stream, 0, sizeof(*stream)); + stream->pos = 0; + stream->buffer = bfromcstr(""); + stream->base.ops = &StringStreamOps; + return 0; +} + +int StringStreamInitFromBstring(StringStream *stream, bstring buffer) +{ + assert(stream != NULL); + memset(stream, 0, sizeof(*stream)); + stream->pos = 0; + stream->buffer = buffer; + stream->base.ops = &StringStreamOps; + return 0; +} diff --git a/src/stringstream.h b/src/stringstream.h new file mode 100644 index 0000000..60d42fb --- /dev/null +++ b/src/stringstream.h @@ -0,0 +1,21 @@ +#ifndef STRINGSTREAM_H +#define STRINGSTREAM_H + +#include "stream.h" +#include <bstrlib/bstrlib.h> +#include <stdio.h> + +typedef struct StringStream StringStream; + +struct StringStream +{ + Stream base; + bstring buffer; + int64_t pos; +}; + +int StringStreamInit(StringStream *stream); + +int StringStreamInitFromBstring(StringStream *stream, bstring buffer); + +#endif |
