diff options
Diffstat (limited to 'src/stream_mqtt.c')
| -rw-r--r-- | src/stream_mqtt.c | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/src/stream_mqtt.c b/src/stream_mqtt.c new file mode 100644 index 0000000..25d2e56 --- /dev/null +++ b/src/stream_mqtt.c @@ -0,0 +1,98 @@ +#include "stream_mqtt.h" +#include "stringbuf.h" + +#include <string.h> + +int64_t StreamReadMqttString(char **s, size_t *len, Stream *stream) +{ + StringBuf buf; + int64_t rv; + + if ((rv = StreamReadMqttStringBuf(&buf, stream)) == -1) + return -1; + + *s = buf.data; + *len = buf.len; + + return rv; +} + +int64_t StreamWriteMqttString(const char *s, int len, Stream *stream) +{ + StringBuf buf; + + if (len < 0) + len = strlen(s); + + buf.data = (char *) s; + buf.len = len; + + return StreamWriteMqttStringBuf(&buf, stream); +} + +int64_t StreamReadMqttStringBuf(struct StringBuf *buf, Stream *stream) +{ + uint16_t len; + + if (StreamReadUint16Be(&len, stream) == -1) + return -1; + + if (StringBufInit(buf, len) == -1) + return -1; + + if (StreamRead(buf->data, len, stream) == -1) + { + StringBufDeinit(buf); + return -1; + } + + buf->len = len; + + return len+2; +} + +int64_t StreamWriteMqttStringBuf(const struct StringBuf *buf, Stream *stream) +{ + if (StreamWriteUint16Be(buf->len, stream) == -1) + return -1; + + if (StreamWrite(buf->data, buf->len, stream) == -1) + return -1; + + return 2 + buf->len; +} + +int64_t StreamReadRemainingLength(size_t *remainingLength, Stream *stream) +{ + size_t multiplier = 1; + unsigned char encodedByte; + *remainingLength = 0; + do + { + if (StreamRead(&encodedByte, 1, stream) != 1) + return -1; + *remainingLength += (encodedByte & 127) * multiplier; + if (multiplier > 128*128*128) + return -1; + multiplier *= 128; + } + while ((encodedByte & 128) != 0); + return 0; +} + +int64_t StreamWriteRemainingLength(size_t remainingLength, Stream *stream) +{ + size_t nbytes = 0; + do + { + unsigned char encodedByte = remainingLength % 128; + remainingLength /= 128; + if (remainingLength > 0) + encodedByte |= 128; + if (StreamWrite(&encodedByte, 1, stream) != 1) + return -1; + ++nbytes; + } + while (remainingLength > 0); + return nbytes; +} |
