aboutsummaryrefslogtreecommitdiff
path: root/src/stream_mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream_mqtt.c')
-rw-r--r--src/stream_mqtt.c98
1 files changed, 98 insertions, 0 deletions
diff --git a/src/stream_mqtt.c b/src/stream_mqtt.c
new file mode 100644
index 0000000..25d2e56
--- /dev/null
+++ b/src/stream_mqtt.c
@@ -0,0 +1,98 @@
+#include "stream_mqtt.h"
+#include "stringbuf.h"
+
+#include <string.h>
+
+int64_t StreamReadMqttString(char **s, size_t *len, Stream *stream)
+{
+ StringBuf buf;
+ int64_t rv;
+
+ if ((rv = StreamReadMqttStringBuf(&buf, stream)) == -1)
+ return -1;
+
+ *s = buf.data;
+ *len = buf.len;
+
+ return rv;
+}
+
+int64_t StreamWriteMqttString(const char *s, int len, Stream *stream)
+{
+ StringBuf buf;
+
+ if (len < 0)
+ len = strlen(s);
+
+ buf.data = (char *) s;
+ buf.len = len;
+
+ return StreamWriteMqttStringBuf(&buf, stream);
+}
+
+int64_t StreamReadMqttStringBuf(struct StringBuf *buf, Stream *stream)
+{
+ uint16_t len;
+
+ if (StreamReadUint16Be(&len, stream) == -1)
+ return -1;
+
+ if (StringBufInit(buf, len) == -1)
+ return -1;
+
+ if (StreamRead(buf->data, len, stream) == -1)
+ {
+ StringBufDeinit(buf);
+ return -1;
+ }
+
+ buf->len = len;
+
+ return len+2;
+}
+
+int64_t StreamWriteMqttStringBuf(const struct StringBuf *buf, Stream *stream)
+{
+ if (StreamWriteUint16Be(buf->len, stream) == -1)
+ return -1;
+
+ if (StreamWrite(buf->data, buf->len, stream) == -1)
+ return -1;
+
+ return 2 + buf->len;
+}
+
+int64_t StreamReadRemainingLength(size_t *remainingLength, Stream *stream)
+{
+ size_t multiplier = 1;
+ unsigned char encodedByte;
+ *remainingLength = 0;
+ do
+ {
+ if (StreamRead(&encodedByte, 1, stream) != 1)
+ return -1;
+ *remainingLength += (encodedByte & 127) * multiplier;
+ if (multiplier > 128*128*128)
+ return -1;
+ multiplier *= 128;
+ }
+ while ((encodedByte & 128) != 0);
+ return 0;
+}
+
+int64_t StreamWriteRemainingLength(size_t remainingLength, Stream *stream)
+{
+ size_t nbytes = 0;
+ do
+ {
+ unsigned char encodedByte = remainingLength % 128;
+ remainingLength /= 128;
+ if (remainingLength > 0)
+ encodedByte |= 128;
+ if (StreamWrite(&encodedByte, 1, stream) != 1)
+ return -1;
+ ++nbytes;
+ }
+ while (remainingLength > 0);
+ return nbytes;
+}