diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-19 13:15:28 +0200 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-19 13:15:28 +0200 |
| commit | 6748a038cf62cfb6e817f758ce8255e0e2debe60 (patch) | |
| tree | 3ee0ef0cf671b35a67bd9a5985ca32e05dc6e296 | |
| parent | c4dbe5151ef76e46dc2adc9e373dd8447219bcaf (diff) | |
| download | mqtt-6748a038cf62cfb6e817f758ce8255e0e2debe60.tar.gz mqtt-6748a038cf62cfb6e817f758ce8255e0e2debe60.zip | |
Support reconnecting in MqttClientConnect()
| -rw-r--r-- | src/client.c | 57 |
1 files changed, 43 insertions, 14 deletions
diff --git a/src/client.c b/src/client.c index ac2eac7..bdfaf2c 100644 --- a/src/client.c +++ b/src/client.c @@ -90,6 +90,7 @@ static int MqttClientSendPacket(MqttClient *client, MqttPacket *packet); static int MqttClientRecvPacket(MqttClient *client); static uint16_t MqttClientNextPacketId(MqttClient *client); static void MqttClientProcessMessageQueue(MqttClient *client); +static void MqttClientClearQueues(MqttClient *client); static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client) { @@ -145,19 +146,7 @@ MqttClient *MqttClientNew(const char *clientId) void MqttClientFree(MqttClient *client) { - MqttPacket *packet, *next; - - TAILQ_FOREACH_SAFE(packet, &client->outMessages, messages, next) - { - TAILQ_REMOVE(&client->outMessages, packet, messages); - MqttPacketFree(packet); - } - - TAILQ_FOREACH_SAFE(packet, &client->inMessages, messages, next) - { - TAILQ_REMOVE(&client->inMessages, packet, messages); - MqttPacketFree(packet); - } + MqttClientClearQueues(client); bdestroy(client->clientId); bdestroy(client->willTopic); @@ -227,11 +216,19 @@ int MqttClientConnect(MqttClient *client, const char *host, short port, assert(client != NULL); assert(host != NULL); - client->host = bfromcstr(host); + if (client->host) + bassigncstr(client->host, host); + else + client->host = bfromcstr(host); client->port = port; client->keepAlive = keepAlive; client->cleanSession = cleanSession; + /* In case we are reconnecting */ + client->stopped = 0; + client->pingSent = 0; + MqttClientClearQueues(client); + if (keepAlive < 0) { LOG_ERROR("invalid keepAlive: %d", keepAlive); @@ -1086,3 +1083,35 @@ static void MqttClientProcessMessageQueue(MqttClient *client) MqttClientProcessInMessages(client); MqttClientProcessOutMessages(client); } + +static void MqttClientClearQueues(MqttClient *client) +{ + while (!SIMPLEQ_EMPTY(&client->sendQueue)) + { + MqttPacket *packet = SIMPLEQ_FIRST(&client->sendQueue); + + SIMPLEQ_REMOVE_HEAD(&client->sendQueue, sendQueue); + + if (TAILQ_NEXT(packet, messages) == NULL && + TAILQ_PREV(packet, MessageList, messages) == NULL && + TAILQ_FIRST(&client->inMessages) != packet && + TAILQ_FIRST(&client->outMessages) != packet) + { + MqttPacketFree(packet); + } + } + + while (!TAILQ_EMPTY(&client->outMessages)) + { + MqttPacket *packet = TAILQ_FIRST(&client->outMessages); + TAILQ_REMOVE(&client->outMessages, packet, messages); + MqttPacketFree(packet); + } + + while (!TAILQ_EMPTY(&client->inMessages)) + { + MqttPacket *packet = TAILQ_FIRST(&client->inMessages); + TAILQ_REMOVE(&client->inMessages, packet, messages); + MqttPacketFree(packet); + } +} |
