aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2017-03-06 21:32:49 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2017-03-06 21:32:49 +0200
commita3dbe2dff7a56485eb4054368c2d2d1c4dee3437 (patch)
treed3f0cd0ddd3e539c1ea3325184abe1dcbebcabd1
parenta062d934aef40829d9559a8ca83147ea4c44108e (diff)
downloadmqtt-a3dbe2dff7a56485eb4054368c2d2d1c4dee3437.tar.gz
mqtt-a3dbe2dff7a56485eb4054368c2d2d1c4dee3437.zip
Add forgotten files
-rw-r--r--src/message.c11
-rw-r--r--src/message.h40
-rw-r--r--src/stringstream.c115
-rw-r--r--src/stringstream.h21
4 files changed, 187 insertions, 0 deletions
diff --git a/src/message.c b/src/message.c
new file mode 100644
index 0000000..35d9c32
--- /dev/null
+++ b/src/message.c
@@ -0,0 +1,11 @@
+#include "message.h"
+#include "stringstream.h"
+#include "stream_mqtt.h"
+#include "packet.h"
+
+void MqttMessageFree(MqttMessage *msg)
+{
+ bdestroy(msg->topic);
+ bdestroy(msg->payload);
+ free(msg);
+}
diff --git a/src/message.h b/src/message.h
new file mode 100644
index 0000000..04a3d61
--- /dev/null
+++ b/src/message.h
@@ -0,0 +1,40 @@
+#ifndef MESSAGE_H
+#define MESSAGE_H
+
+#include <stdint.h>
+
+#include "queue.h"
+#include <bstrlib/bstrlib.h>
+
+enum MqttMessageState
+{
+ MqttMessageStateQueued,
+ MqttMessageStatePublish,
+ MqttMessageStateWaitPubAck,
+ MqttMessageStateWaitPubRec,
+ MqttMessageStateWaitPubComp,
+ MqttMessageStateWaitPubRel
+};
+
+typedef struct MqttMessage MqttMessage;
+
+struct MqttMessage
+{
+ int state;
+ int qos;
+ int retain;
+ int dup;
+ int padding;
+ uint16_t id;
+ int64_t timestamp;
+ bstring topic;
+ bstring payload;
+ TAILQ_ENTRY(MqttMessage) chain;
+};
+
+typedef struct MqttMessageList MqttMessageList;
+TAILQ_HEAD(MqttMessageList, MqttMessage);
+
+void MqttMessageFree(MqttMessage *msg);
+
+#endif
diff --git a/src/stringstream.c b/src/stringstream.c
new file mode 100644
index 0000000..4353932
--- /dev/null
+++ b/src/stringstream.c
@@ -0,0 +1,115 @@
+#include "stringstream.h"
+
+#include <assert.h>
+
+static int StringStreamClose(Stream *base)
+{
+ StringStream *ss = (StringStream *) base;
+ bdestroy(ss->buffer);
+ ss->buffer = NULL;
+ return 0;
+}
+
+static int64_t StringStreamRead(void *ptr, size_t size, Stream *stream)
+{
+ StringStream *ss = (StringStream *) stream;
+ int64_t available = blength(ss->buffer) - ss->pos;
+ void *bufptr;
+
+ if (available <= 0)
+ {
+ return -1;
+ }
+
+ if (size > (size_t) available)
+ size = available;
+
+ /* Use a temp buffer pointer to make some warnings disappear when using
+ GCC */
+ bufptr = bdataofs(ss->buffer, ss->pos);
+ memcpy(ptr, bufptr, size);
+
+ ss->pos += size;
+
+ return size;
+}
+
+static int64_t StringStreamWrite(const void *ptr, size_t size, Stream *stream)
+{
+ StringStream *ss = (StringStream *) stream;
+ struct tagbstring buf;
+ if (ss->buffer->mlen <= 0)
+ return -1;
+ btfromblk(buf, ptr, size);
+ bsetstr(ss->buffer, ss->pos, &buf, '\0');
+ ss->pos += size;
+ return size;
+}
+
+int StringStreamSeek(Stream *base, int64_t offset, int whence)
+{
+ StringStream *ss = (StringStream *) base;
+ int64_t newpos = 0;
+
+ if (whence == SEEK_SET)
+ {
+ newpos = offset;
+ }
+ else if (whence == SEEK_CUR)
+ {
+ newpos = ss->pos + offset;
+ }
+ else if (whence == SEEK_END)
+ {
+ newpos = blength(ss->buffer) - offset;
+ }
+ else
+ {
+ return -1;
+ }
+
+ if (newpos > blength(ss->buffer))
+ return -1;
+
+ if (newpos < 0)
+ return -1;
+
+ ss->pos = newpos;
+
+ return 0;
+}
+
+int64_t StringStreamTell(Stream *base)
+{
+ StringStream *ss = (StringStream *) base;
+ return ss->pos;
+}
+
+static const StreamOps StringStreamOps =
+{
+ StringStreamRead,
+ StringStreamWrite,
+ StringStreamClose,
+ StringStreamSeek,
+ StringStreamTell
+};
+
+int StringStreamInit(StringStream *stream)
+{
+ assert(stream != NULL);
+ memset(stream, 0, sizeof(*stream));
+ stream->pos = 0;
+ stream->buffer = bfromcstr("");
+ stream->base.ops = &StringStreamOps;
+ return 0;
+}
+
+int StringStreamInitFromBstring(StringStream *stream, bstring buffer)
+{
+ assert(stream != NULL);
+ memset(stream, 0, sizeof(*stream));
+ stream->pos = 0;
+ stream->buffer = buffer;
+ stream->base.ops = &StringStreamOps;
+ return 0;
+}
diff --git a/src/stringstream.h b/src/stringstream.h
new file mode 100644
index 0000000..60d42fb
--- /dev/null
+++ b/src/stringstream.h
@@ -0,0 +1,21 @@
+#ifndef STRINGSTREAM_H
+#define STRINGSTREAM_H
+
+#include "stream.h"
+#include <bstrlib/bstrlib.h>
+#include <stdio.h>
+
+typedef struct StringStream StringStream;
+
+struct StringStream
+{
+ Stream base;
+ bstring buffer;
+ int64_t pos;
+};
+
+int StringStreamInit(StringStream *stream);
+
+int StringStreamInitFromBstring(StringStream *stream, bstring buffer);
+
+#endif