diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-19 16:23:21 +0200 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-19 16:23:21 +0200 |
| commit | 7f8f79eb9d3cab407410814b2db261d89f11af2e (patch) | |
| tree | e363f87522001e063fedf8a7ad42740da3edce94 /amalgamation | |
| parent | 8198f6d6beb3c8af3768236070089112c094b92e (diff) | |
| download | mqtt-7f8f79eb9d3cab407410814b2db261d89f11af2e.tar.gz mqtt-7f8f79eb9d3cab407410814b2db261d89f11af2e.zip | |
Update amalgamationv0.4
Diffstat (limited to 'amalgamation')
| -rw-r--r-- | amalgamation/mqtt.c | 177 | ||||
| -rw-r--r-- | amalgamation/mqtt.h | 4 |
2 files changed, 141 insertions, 40 deletions
diff --git a/amalgamation/mqtt.c b/amalgamation/mqtt.c index 40f3a22..1c8f88a 100644 --- a/amalgamation/mqtt.c +++ b/amalgamation/mqtt.c @@ -5152,8 +5152,8 @@ typedef struct MqttPacketSubscribe MqttPacketSubscribe; struct MqttPacketSubscribe { MqttPacket base; - bstring topicFilter; - char qos; + struct bstrList *topicFilters; + int *qos; }; typedef struct MqttPacketSubAck MqttPacketSubAck; @@ -5161,7 +5161,7 @@ typedef struct MqttPacketSubAck MqttPacketSubAck; struct MqttPacketSubAck { MqttPacket base; - unsigned char returnCode; + unsigned char *returnCode; }; typedef struct MqttPacketUnsubscribe MqttPacketUnsubscribe; @@ -5284,7 +5284,7 @@ void MqttPacketFree(MqttPacket *packet) else if (MqttPacketType(packet) == MqttPacketTypeSubscribe) { MqttPacketSubscribe *p = (MqttPacketSubscribe *) packet; - bdestroy(p->topicFilter); + bstrListDestroy(p->topicFilters); } else if (MqttPacketType(packet) == MqttPacketTypeUnsubscribe) { @@ -5374,7 +5374,16 @@ static size_t MqttPacketConnectGetRemainingLength(const MqttPacketConnect *packe static size_t MqttPacketSubscribeGetRemainingLength(const MqttPacketSubscribe *packet) { - return 2 + MqttStringLengthSerialized(packet->topicFilter) + 1; + size_t remaining = 2; + int i; + + for (i = 0; i < packet->topicFilters->qty; ++i) + { + remaining += MqttStringLengthSerialized(packet->topicFilters->entry[i]); + remaining += 1; + } + + return remaining; } static size_t MqttPacketUnsubscribeGetRemainingLength(const MqttPacketUnsubscribe *packet) @@ -5529,14 +5538,21 @@ static int MqttPacketConnectSerialize(const MqttPacketConnect *packet, Stream *s static int MqttPacketSubscribeSerialize(const MqttPacketSubscribe *packet, Stream *stream) { + int i; + if (MqttPacketWithIdSerialize((const MqttPacket *) packet, stream) == -1) return -1; - if (StreamWriteMqttString(packet->topicFilter, stream) == -1) - return -1; + for (i = 0; i < packet->topicFilters->qty; ++i) + { + unsigned char qos = (unsigned char) packet->qos[i]; - if (StreamWrite(&packet->qos, 1, stream) == -1) - return -1; + if (StreamWriteMqttString(packet->topicFilters->entry[i], stream) == -1) + return -1; + + if (StreamWrite(&qos, 1, stream) == -1) + return -1; + } return 0; } @@ -5704,19 +5720,24 @@ static int MqttPacketConnAckDeserialize(MqttPacketConnAck **packet, Stream *stre static int MqttPacketSubAckDeserialize(MqttPacketSubAck **packet, Stream *stream) { size_t remainingLength = 0; + size_t i; if (StreamReadRemainingLength(&remainingLength, stream) == -1) return -1; - /* 2 bytes for packet id and 1 byte for single return code */ - if (remainingLength != 3) - return -1; - if (StreamReadUint16Be(&((*packet)->base.id), stream) == -1) return -1; - if (StreamRead(&((*packet)->returnCode), 1, stream) == -1) - return -1; + remainingLength -= 2; + + (*packet)->returnCode = (unsigned char *) malloc( + sizeof(*(*packet)->returnCode) * remainingLength); + + for (i = 0; i < remainingLength; ++i) + { + if (StreamRead(&((*packet)->returnCode[i]), 1, stream) == -1) + return -1; + } return 0; } @@ -6008,6 +6029,8 @@ struct MqttClient bstring willMessage; int willQos; int willRetain; + /* 1 if client should ignore incoming PUBLISH messages, 0 handle them */ + int paused; }; enum MessageState @@ -6023,6 +6046,7 @@ static int MqttClientSendPacket(MqttClient *client, MqttPacket *packet); static int MqttClientRecvPacket(MqttClient *client); static uint16_t MqttClientNextPacketId(MqttClient *client); static void MqttClientProcessMessageQueue(MqttClient *client); +static void MqttClientClearQueues(MqttClient *client); static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client) { @@ -6078,19 +6102,7 @@ MqttClient *MqttClientNew(const char *clientId) void MqttClientFree(MqttClient *client) { - MqttPacket *packet, *next; - - TAILQ_FOREACH_SAFE(packet, &client->outMessages, messages, next) - { - TAILQ_REMOVE(&client->outMessages, packet, messages); - MqttPacketFree(packet); - } - - TAILQ_FOREACH_SAFE(packet, &client->inMessages, messages, next) - { - TAILQ_REMOVE(&client->inMessages, packet, messages); - MqttPacketFree(packet); - } + MqttClientClearQueues(client); bdestroy(client->clientId); bdestroy(client->willTopic); @@ -6160,11 +6172,19 @@ int MqttClientConnect(MqttClient *client, const char *host, short port, assert(client != NULL); assert(host != NULL); - client->host = bfromcstr(host); + if (client->host) + bassigncstr(client->host, host); + else + client->host = bfromcstr(host); client->port = port; client->keepAlive = keepAlive; client->cleanSession = cleanSession; + /* In case we are reconnecting */ + client->stopped = 0; + client->pingSent = 0; + MqttClientClearQueues(client); + if (keepAlive < 0) { LOG_ERROR("invalid keepAlive: %d", keepAlive); @@ -6348,11 +6368,20 @@ int MqttClientRun(MqttClient *client) int MqttClientSubscribe(MqttClient *client, const char *topicFilter, int qos) { + return MqttClientSubscribeMany(client, &topicFilter, &qos, 1); +} + +int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters, + int *qos, size_t count) +{ MqttPacketSubscribe *packet = NULL; + size_t i; assert(client != NULL); - assert(topicFilter != NULL); - assert(qos >= 0 && qos <= 2); + assert(topicFilters != NULL); + assert(*topicFilters != NULL); + assert(qos != NULL); + assert(count > 0); packet = (MqttPacketSubscribe *) MqttPacketWithIdNew( MqttPacketTypeSubscribe, MqttClientNextPacketId(client)); @@ -6360,8 +6389,18 @@ int MqttClientSubscribe(MqttClient *client, const char *topicFilter, if (!packet) return -1; - packet->topicFilter = bfromcstr(topicFilter); - packet->qos = qos; + packet->topicFilters = bstrListCreate(); + bstrListAllocMin(packet->topicFilters, count); + + packet->qos = (int *) malloc(sizeof(int) * count); + + for (i = 0; i < count; ++i) + { + packet->topicFilters->entry[i] = bfromcstr(topicFilters[i]); + ++packet->topicFilters->qty; + } + + memcpy(packet->qos, qos, sizeof(int) * count); MqttClientQueuePacket(client, (MqttPacket *) packet); @@ -6595,21 +6634,35 @@ static void MqttClientHandleSubAck(MqttClient *client, MqttPacketSubAck *packet) } else { - TAILQ_REMOVE(&client->outMessages, sub, messages); - MqttPacketFree(sub); - if (client->onSubscribe) { - LOG_DEBUG("calling onSubscribe id:%d rc:%d", MqttPacketId(packet), - packet->returnCode); - client->onSubscribe(client, MqttPacketId(packet), - packet->returnCode); + MqttPacketSubscribe *sub2; + int i; + + sub2 = (MqttPacketSubscribe *) sub; + + for (i = 0; i < sub2->topicFilters->qty; ++i) + { + const char *filter = bdata(sub2->topicFilters->entry[i]); + int rc = packet->returnCode[i]; + + LOG_DEBUG("calling onSubscribe id:%d filter:'%s' rc:%d", + MqttPacketId(packet), filter, rc); + + client->onSubscribe(client, MqttPacketId(packet), filter, rc); + } } + + TAILQ_REMOVE(&client->outMessages, sub, messages); + MqttPacketFree(sub); } } static void MqttClientHandlePublish(MqttClient *client, MqttPacketPublish *packet) { + if (client->paused) + return; + if (MqttPacketPublishQos(packet) == 2) { /* Check if we have sent a PUBREC previously with the same id. If we @@ -7019,3 +7072,47 @@ static void MqttClientProcessMessageQueue(MqttClient *client) MqttClientProcessInMessages(client); MqttClientProcessOutMessages(client); } + +static void MqttClientClearQueues(MqttClient *client) +{ + while (!SIMPLEQ_EMPTY(&client->sendQueue)) + { + MqttPacket *packet = SIMPLEQ_FIRST(&client->sendQueue); + + SIMPLEQ_REMOVE_HEAD(&client->sendQueue, sendQueue); + + if (TAILQ_NEXT(packet, messages) == NULL && + TAILQ_PREV(packet, MessageList, messages) == NULL && + TAILQ_FIRST(&client->inMessages) != packet && + TAILQ_FIRST(&client->outMessages) != packet) + { + MqttPacketFree(packet); + } + } + + while (!TAILQ_EMPTY(&client->outMessages)) + { + MqttPacket *packet = TAILQ_FIRST(&client->outMessages); + TAILQ_REMOVE(&client->outMessages, packet, messages); + MqttPacketFree(packet); + } + + while (!TAILQ_EMPTY(&client->inMessages)) + { + MqttPacket *packet = TAILQ_FIRST(&client->inMessages); + TAILQ_REMOVE(&client->inMessages, packet, messages); + MqttPacketFree(packet); + } +} + +void MqttClientPause(MqttClient *client) +{ + assert(client != NULL); + client->paused = 1; +} + +void MqttClientResume(MqttClient *client) +{ + assert(client != NULL); + client->paused = 0; +} diff --git a/amalgamation/mqtt.h b/amalgamation/mqtt.h index f07ff3c..ad84aaf 100644 --- a/amalgamation/mqtt.h +++ b/amalgamation/mqtt.h @@ -33,6 +33,7 @@ typedef void (*MqttClientOnConnectCallback)(MqttClient *client, typedef void (*MqttClientOnSubscribeCallback)(MqttClient *client, int id, + const char *topicFilter, MqttSubscriptionStatus status); typedef void (*MqttClientOnUnsubscribeCallback)(MqttClient *client, int id); @@ -82,6 +83,9 @@ int MqttClientRun(MqttClient *client); int MqttClientSubscribe(MqttClient *client, const char *topicFilter, int qos); +int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters, + int *qos, size_t count); + int MqttClientUnsubscribe(MqttClient *client, const char *topicFilter); int MqttClientPublish(MqttClient *client, int qos, int retain, |
